大文件分片上传设计与断点续传实现:从理论到实战
目录
前言
在AI大模型云盘系统中,文件上传是核心功能之一。我们面临的最大挑战是:用户需要上传20MB+的大文件(如视频、AI模型文件),但传统上传方式成功率只有40%。
本文将详细分享我们设计的分片上传方案,以及如何实现断点续传、秒传等高级功能。
分片上传架构总览
上传状态机
一、问题分析
1.1 传统上传的问题
单文件直接上传:
- ❌ 网络不稳定导致上传失败需重来
- ❌ 大文件占用服务器连接时间长
- ❌ 无法显示上传进度
- ❌ 不支持暂停/继续
失败率统计:
- <5MB文件:成功率95%
- 5-20MB文件:成功率70%
-
20MB文件:成功率40%
1.2 业务需求
- 可靠性:大文件上传成功率>90%
- 体验:显示实时上传进度
- 效率:支持断点续传,失败不重传
- 成本:避免重复文件占用存储
二、方案设计
2.1 整体架构
用户端 服务端 存储层
│ │ │
│ 1. 初始化上传 │ │
│───────────────────────>│ │
│ 2. 返回uploadId │ │
│<───────────────────────│ │
│ │ │
│ 3. 分片上传(并行) │ │
│───────────────────────>│ │
│ 4. 存储分片 │ │
│ │───────────────────────>│
│ 5. 返回分片ETag │ │
│<───────────────────────│ │
│ │ │
│ 6. 合并请求 │ │
│───────────────────────>│ │
│ 7. 合并分片 │ │
│ │───────────────────────>│
│ 8. 返回文件URL │ │
│<───────────────────────│ │2.2 核心流程
- 初始化:获取uploadId,计算分片
- 分片上传:并行上传,断点记录
- 秒传检查:MD5比对,已存在则直接返回
- 分片合并:服务端合并分片为完整文件
- 回调通知:通知业务系统处理
三、服务端实现
3.1 数据模型
@Entity
@Table(name = "file_upload_task")
@Data
public class UploadTask {
@Id
private String uploadId; // 上传任务ID
private String fileName; // 文件名
private String fileMd5; // 文件MD5
private Long fileSize; // 文件大小
private Integer chunkSize; // 分片大小
private Integer totalChunks; // 总分片数
private Integer uploadedChunks; // 已上传分片数
private String chunkMd5List; // 分片MD5列表(JSON)
@Enumerated(EnumType.STRING)
private UploadStatus status; // INIT/UPLOADING/COMPLETED/FAILED
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime expireTime; // 过期时间(默认24小时)
}
@Entity
@Table(name = "file_chunk")
@Data
public class FileChunk {
@Id
private String id;
private String uploadId; // 所属上传任务
private Integer chunkNumber; // 分片序号
private Long chunkSize; // 分片大小
private String chunkMd5; // 分片MD5
private String storagePath; // 存储路径
@Enumerated(EnumType.STRING)
private ChunkStatus status; // PENDING/UPLOADED/VERIFIED
private LocalDateTime uploadTime;
}3.2 初始化上传
@Service
public class FileUploadService {
@Autowired
private UploadTaskRepository taskRepository;
@Autowired
private FileChunkRepository chunkRepository;
@Autowired
private MinioClient minioClient;
@Value("${upload.chunk-size:5242880}") // 默认5MB
private Integer chunkSize;
/**
* 初始化上传任务
*/
@Transactional
public InitUploadResponse initUpload(InitUploadRequest request) {
// 1. 检查是否已存在(秒传)
FileRecord existFile = fileRecordRepository.findByMd5(request.getFileMd5());
if (existFile != null) {
return InitUploadResponse.builder()
.uploadId(null)
.isExist(true)
.fileUrl(existFile.getUrl())
.message("文件已存在,秒传成功")
.build();
}
// 2. 检查是否有未完成的任务(断点续传)
UploadTask existTask = taskRepository.findByFileMd5AndStatus(
request.getFileMd5(), UploadStatus.UPLOADING);
if (existTask != null && !existTask.isExpired()) {
// 返回已有任务,支持断点续传
List<Integer> uploadedChunks = chunkRepository
.findUploadedChunkNumbers(existTask.getUploadId());
return InitUploadResponse.builder()
.uploadId(existTask.getUploadId())
.isExist(false)
.chunkSize(existTask.getChunkSize())
.totalChunks(existTask.getTotalChunks())
.uploadedChunks(uploadedChunks)
.message("存在未完成任务,继续上传")
.build();
}
// 3. 创建新任务
String uploadId = generateUploadId();
int totalChunks = (int) Math.ceil((double) request.getFileSize() / chunkSize);
UploadTask task = new UploadTask();
task.setUploadId(uploadId);
task.setFileName(request.getFileName());
task.setFileMd5(request.getFileMd5());
task.setFileSize(request.getFileSize());
task.setChunkSize(chunkSize);
task.setTotalChunks(totalChunks);
task.setUploadedChunks(0);
task.setStatus(UploadStatus.INIT);
task.setCreateTime(LocalDateTime.now());
task.setUpdateTime(LocalDateTime.now());
task.setExpireTime(LocalDateTime.now().plusHours(24));
taskRepository.save(task);
return InitUploadResponse.builder()
.uploadId(uploadId)
.isExist(false)
.chunkSize(chunkSize)
.totalChunks(totalChunks)
.uploadedChunks(Collections.emptyList())
.message("初始化成功")
.build();
}
}3.3 分片上传
@Service
public class ChunkUploadService {
/**
* 上传分片
*/
@Transactional
public ChunkUploadResponse uploadChunk(String uploadId, Integer chunkNumber,
MultipartFile chunkFile) {
// 1. 验证任务
UploadTask task = taskRepository.findById(uploadId)
.orElseThrow(() -> new UploadException("上传任务不存在"));
if (task.getStatus() == UploadStatus.COMPLETED) {
throw new UploadException("上传任务已完成");
}
if (task.isExpired()) {
throw new UploadException("上传任务已过期");
}
// 2. 检查分片是否已上传
FileChunk existChunk = chunkRepository.findByUploadIdAndChunkNumber(uploadId, chunkNumber);
if (existChunk != null && existChunk.getStatus() == ChunkStatus.VERIFIED) {
return ChunkUploadResponse.builder()
.chunkNumber(chunkNumber)
.isUploaded(true)
.message("分片已上传")
.build();
}
try {
// 3. 计算分片MD5
String chunkMd5 = calculateMd5(chunkFile.getInputStream());
// 4. 上传至MinIO
String objectName = String.format("chunks/%s/%d", uploadId, chunkNumber);
minioClient.putObject(
PutObjectArgs.builder()
.bucket("upload-bucket")
.object(objectName)
.stream(chunkFile.getInputStream(), chunkFile.getSize(), -1)
.contentType(chunkFile.getContentType())
.build()
);
// 5. 保存分片信息
FileChunk chunk = new FileChunk();
chunk.setId(generateChunkId());
chunk.setUploadId(uploadId);
chunk.setChunkNumber(chunkNumber);
chunk.setChunkSize(chunkFile.getSize());
chunk.setChunkMd5(chunkMd5);
chunk.setStoragePath(objectName);
chunk.setStatus(ChunkStatus.VERIFIED);
chunk.setUploadTime(LocalDateTime.now());
chunkRepository.save(chunk);
// 6. 更新任务进度
task.setUploadedChunks(task.getUploadedChunks() + 1);
task.setStatus(UploadStatus.UPLOADING);
task.setUpdateTime(LocalDateTime.now());
taskRepository.save(task);
return ChunkUploadResponse.builder()
.chunkNumber(chunkNumber)
.chunkMd5(chunkMd5)
.isUploaded(true)
.progress(calculateProgress(task))
.message("分片上传成功")
.build();
} catch (Exception e) {
log.error("分片上传失败", e);
throw new UploadException("分片上传失败: " + e.getMessage());
}
}
/**
* 批量验证分片(用于断点续传)
*/
public List<Integer> verifyChunks(String uploadId, List<ChunkVerifyRequest> chunks) {
UploadTask task = taskRepository.findById(uploadId)
.orElseThrow(() -> new UploadException("上传任务不存在"));
List<Integer> verifiedChunks = new ArrayList<>();
for (ChunkVerifyRequest chunkReq : chunks) {
FileChunk chunk = chunkRepository.findByUploadIdAndChunkNumber(
uploadId, chunkReq.getChunkNumber());
if (chunk != null &&
chunk.getStatus() == ChunkStatus.VERIFIED &&
chunk.getChunkMd5().equals(chunkReq.getChunkMd5())) {
verifiedChunks.add(chunkReq.getChunkNumber());
}
}
return verifiedChunks;
}
}3.4 分片合并
@Service
public class FileMergeService {
@Autowired
private MinioClient minioClient;
/**
* 合并分片
*/
@Transactional
public MergeResponse mergeChunks(String uploadId) {
// 1. 验证任务
UploadTask task = taskRepository.findById(uploadId)
.orElseThrow(() -> new UploadException("上传任务不存在"));
if (task.getUploadedChunks() < task.getTotalChunks()) {
throw new UploadException("分片未全部上传完成");
}
// 2. 获取所有分片
List<FileChunk> chunks = chunkRepository.findByUploadIdOrderByChunkNumber(uploadId);
// 3. 验证分片完整性
if (chunks.size() != task.getTotalChunks()) {
throw new UploadException("分片数量不匹配");
}
// 4. 合并分片(使用MinIO Compose API)
List<ComposeSource> sources = chunks.stream()
.map(chunk -> ComposeSource.builder()
.bucket("upload-bucket")
.object(chunk.getStoragePath())
.build())
.collect(Collectors.toList());
String finalObjectName = String.format("files/%s/%s",
LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE),
generateFileName(task.getFileName()));
try {
// 合并文件
minioClient.composeObject(
ComposeObjectArgs.builder()
.bucket("file-bucket")
.object(finalObjectName)
.sources(sources)
.build()
);
// 5. 验证文件MD5
ObjectStat stat = minioClient.statObject(
StatObjectArgs.builder()
.bucket("file-bucket")
.object(finalObjectName)
.build()
);
// 6. 生成访问URL
String fileUrl = minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket("file-bucket")
.object(finalObjectName)
.expiry(7, TimeUnit.DAYS)
.build()
);
// 7. 保存文件记录
FileRecord fileRecord = new FileRecord();
fileRecord.setFileName(task.getFileName());
fileRecord.setFileMd5(task.getFileMd5());
fileRecord.setFileSize(task.getFileSize());
fileRecord.setStoragePath(finalObjectName);
fileRecord.setUrl(fileUrl);
fileRecord.setCreateTime(LocalDateTime.now());
fileRecordRepository.save(fileRecord);
// 8. 更新任务状态
task.setStatus(UploadStatus.COMPLETED);
task.setUpdateTime(LocalDateTime.now());
taskRepository.save(task);
// 9. 清理分片(异步)
asyncDeleteChunks(uploadId, chunks);
return MergeResponse.builder()
.fileId(fileRecord.getId())
.fileUrl(fileUrl)
.fileSize(task.getFileSize())
.message("文件合并成功")
.build();
} catch (Exception e) {
log.error("合并分片失败", e);
task.setStatus(UploadStatus.FAILED);
taskRepository.save(task);
throw new UploadException("合并分片失败: " + e.getMessage());
}
}
/**
* 异步删除分片
*/
@Async
public void asyncDeleteChunks(String uploadId, List<FileChunk> chunks) {
for (FileChunk chunk : chunks) {
try {
minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket("upload-bucket")
.object(chunk.getStoragePath())
.build()
);
chunkRepository.delete(chunk);
} catch (Exception e) {
log.warn("删除分片失败: {}", chunk.getStoragePath(), e);
}
}
}
}3.5 秒传实现
@Service
public class FastUploadService {
/**
* 秒传检查
*/
public FastUploadCheckResponse checkFastUpload(String fileMd5) {
// 1. 检查是否已存在
FileRecord existFile = fileRecordRepository.findByMd5(fileMd5);
if (existFile != null) {
return FastUploadCheckResponse.builder()
.canFastUpload(true)
.fileId(existFile.getId())
.fileUrl(existFile.getUrl())
.message("文件已存在,秒传成功")
.build();
}
// 2. 检查是否有未完成的任务
UploadTask existTask = taskRepository.findByFileMd5AndStatus(
fileMd5, UploadStatus.UPLOADING);
if (existTask != null && !existTask.isExpired()) {
List<Integer> uploadedChunks = chunkRepository
.findUploadedChunkNumbers(existTask.getUploadId());
return FastUploadCheckResponse.builder()
.canFastUpload(false)
.uploadId(existTask.getUploadId())
.uploadedChunks(uploadedChunks)
.message("存在未完成任务,继续上传")
.build();
}
return FastUploadCheckResponse.builder()
.canFastUpload(false)
.message("需要重新上传")
.build();
}
}四、客户端实现
4.1 前端分片上传(简化版)
class ChunkUploader {
constructor(file, options = {}) {
this.file = file;
this.chunkSize = options.chunkSize || 5 * 1024 * 1024; // 5MB
this.uploadId = null;
this.chunks = [];
this.uploadedChunks = new Set();
this.concurrent = options.concurrent || 3; // 并发数
}
async upload() {
// 1. 计算文件MD5
const fileMd5 = await this.calculateMD5(this.file);
// 2. 初始化上传
const initRes = await this.initUpload({
fileName: this.file.name,
fileSize: this.file.size,
fileMd5: fileMd5
});
if (initRes.isExist) {
// 秒传成功
return { success: true, fileUrl: initRes.fileUrl };
}
this.uploadId = initRes.uploadId;
this.uploadedChunks = new Set(initRes.uploadedChunks || []);
// 3. 分片
this.chunks = this.createChunks();
// 4. 并发上传
await this.uploadChunks();
// 5. 合并
return await this.mergeChunks();
}
createChunks() {
const chunks = [];
let start = 0;
let index = 0;
while (start < this.file.size) {
const end = Math.min(start + this.chunkSize, this.file.size);
chunks.push({
index: index,
start: start,
end: end,
blob: this.file.slice(start, end)
});
start = end;
index++;
}
return chunks;
}
async uploadChunks() {
const pendingChunks = this.chunks.filter(
c => !this.uploadedChunks.has(c.index)
);
// 使用Promise.all控制并发
const pool = [];
for (const chunk of pendingChunks) {
const promise = this.uploadChunk(chunk).then(() => {
// 更新进度
this.updateProgress();
}).catch(err => {
// 重试逻辑
return this.retryUpload(chunk, 3);
});
pool.push(promise);
// 控制并发数
if (pool.length >= this.concurrent) {
await Promise.race(pool);
pool.splice(pool.findIndex(p => p === promise), 1);
}
}
await Promise.all(pool);
}
async uploadChunk(chunk) {
const formData = new FormData();
formData.append('chunk', chunk.blob);
formData.append('chunkNumber', chunk.index);
await axios.post(`/api/upload/${this.uploadId}/chunk`, formData, {
headers: { 'Content-Type': 'multipart/form-data' }
});
this.uploadedChunks.add(chunk.index);
}
async retryUpload(chunk, maxRetries) {
for (let i = 0; i < maxRetries; i++) {
try {
await this.uploadChunk(chunk);
return;
} catch (err) {
if (i === maxRetries - 1) throw err;
await this.delay(1000 * (i + 1)); // 指数退避
}
}
}
async mergeChunks() {
const res = await axios.post(`/api/upload/${this.uploadId}/merge`);
return res.data;
}
updateProgress() {
const progress = (this.uploadedChunks.size / this.chunks.length) * 100;
this.onProgress && this.onProgress(progress);
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}五、性能优化
5.1 并发控制
为什么要限制并发?
- 浏览器并发连接数限制(HTTP/1.1通常6个)
- 服务端连接池限制
- 带宽限制
最佳实践:
- 分片大小:5MB(平衡网络开销和并发效率)
- 并发数:3-5个
- 超时时间:30秒
5.2 网络优化
@Configuration
public class UploadConfig {
@Bean
public MultipartConfigElement multipartConfigElement() {
MultipartConfigFactory factory = new MultipartConfigFactory();
// 单个文件最大50MB
factory.setMaxFileSize(DataSize.ofMegabytes(50));
// 总上传数据最大500MB
factory.setMaxRequestSize(DataSize.ofMegabytes(500));
return factory.createMultipartConfig();
}
}5.3 存储优化
MinIO配置:
minio:
endpoint: http://localhost:9000
access-key: minioadmin
secret-key: minioadmin
bucket: file-bucket
connect-timeout: 10000
write-timeout: 60000
read-timeout: 30000六、监控与告警
@Component
public class UploadMetrics {
@Autowired
private MeterRegistry meterRegistry;
@EventListener
public void onUploadComplete(UploadCompleteEvent event) {
// 记录上传成功
meterRegistry.counter("upload.success",
"file_type", getFileType(event.getFileName()))
.increment();
// 记录文件大小分布
meterRegistry.summary("upload.file.size")
.record(event.getFileSize());
// 记录上传时长
meterRegistry.timer("upload.duration")
.record(event.getDuration());
}
@EventListener
public void onUploadFail(UploadFailEvent event) {
// 记录上传失败
meterRegistry.counter("upload.fail",
"reason", event.getReason())
.increment();
}
}七、成果数据
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| <5MB文件成功率 | 95% | 99% | +4% |
| 5-20MB文件成功率 | 70% | 96% | +26% |
| >20MB文件成功率 | 40% | 95% | +55% |
| 平均上传速度 | 2MB/s | 8MB/s | +300% |
| 存储节省(秒传) | 0% | 30% | +30% |
八、踩坑记录
坑1:分片顺序错乱
问题: 并发上传时分片到达顺序不一致
解决: 服务端按chunkNumber排序后再合并
坑2:内存溢出
问题: 大文件读取到内存导致OOM
解决: 使用流式读取,分片处理
坑3:MD5计算慢
问题: 大文件MD5计算阻塞主线程
解决: Web Worker异步计算或使用增量MD5
坑4:断网恢复
问题: 断网后无法自动恢复上传
解决: 监听online事件,自动恢复未完成的任务
九、总结
大文件分片上传的关键要点:
- 分片策略:5MB分片,平衡网络开销和并发效率
- 断点续传:记录已上传分片,失败不重传
- 秒传优化:MD5去重,节省存储和带宽
- 并发控制:限制并发数,避免资源竞争
- 完整性校验:分片MD5校验 + 合并后文件MD5校验
通过这套方案,我们将大文件上传成功率从40%提升到了95%,用户满意度大幅提升。
希望这篇文章对你有所帮助!
相关文章: