java 超大文件上传,java多线程分片上传
目录
Java大文件分片上传首先是交互的控制器上传文件分片参数接收大文件分片上传服务类实现文件分片上传定义公共服务类接口文件分片上传文件操作接口实现类战略情报局阿里云对象存储分片上传实现京东云对象存储实现腾讯云对象存储分片上传分片上传前端代码实现
Java 大文件分片上传
原理:前端通过射流研究…读取文件,并将大文件按照指定大小拆分成多个分片,并且计算每个分片的讯息摘要5值。前端将每个分片分别上传到后端,后端在接收到文件之后验证当前分片的讯息摘要5值是否与上传的讯息摘要5一致,待所有分片上传完成之后后端将多个分片合并成一个大文件,并校验该文件的讯息摘要5值是否与上传时传入的讯息摘要5值一致;
首先是交互的控制器
支持文件分片上传,查询当前已经上传的分片信息,取消文件上传
包com。林艾米。组件。系统。服务。模块化。文件。控制器;导入com。林艾米。常见。核心。POJO。基地。参数。基础参数;导入com。林艾米。常见。核心。POJO。回应。响应数据;导入com。林艾米。常见。日志。注释。业务日志;导入com。林艾米。常见。日志。伊努斯。logoptypeenum导入com。林艾米。常见。安全。注释。许可;导入com。林艾米。组件。系统。服务。模块化。文件。参数。syspartfileparam导入com。林艾米。组件。系统。服务。模块化。文件。结果。syspartfilereult导入com。林艾米。组件。系统。服务。模块化。文件。服务。syspartfileservice导入龙目岛。外部人员。SLF 4j。SLF 4j;导入org。spring框架。验证。注释。已验证;导入org。spring框架。网络。绑定。注释。获取映射;导入org。spring框架。网络。绑定。注释。后期映射;导入org。spring框架。网络。绑定。注释。休息控制器;导入javax。注释。资源;/** * 系统大文件上传* * @ v 1.0版* @ date 2022/5/24 11:22 */@ Slf4j @ RestControllerpublic类SysPartFileController { @ Resource private SysPartFileService SysPartFileService;/** * 上传大文件* */@ Permission @ post映射(/sys fileinfo/part upload )public responsestyspartileresult部分上载(@ Validated(base param。补充。class)SysPartFileParam部分文件){返回响应数据。成功(syspartfileservice。零件上传(零件文件));} /** * 获取文件上传状态* */@ Permission @ get mapping(/sys fileinfo/part upload/status )public responsedyspartileresult getPartUploadStatus(@ Validated(base param。细节。class)SysPartFileParam部分文件){返回响应数据。成功(syspartfileservice。getPartUploadStatus(零件文件));} /** * 获取文件上传状态* */@ Permission @ get mapping(/sys fileinfo/part upload/cancel )@ business log(title=文
件_上传大文件_取消", opType = LogOpTypeEnum.OTHER) public ResponseData<SysPartFileResult> cancelUpload(@Validated(BaseParam.detail.class) SysPartFileParam partFile) { return ResponseData.success(sysPartFileService.cancelUpload(partFile)); }}
上传文件分片参数接收
如果按照分片方式上传文件需要指定当前大文件的MD5、分片MD5、分片内容、分片大小、当前文件名称、文件总大小等信息;另外对于每个文件前端都需要生成一个唯一编码用于确定当前上传的分片属于统一文件。
package com.aimilin.component.system.service.modular.file.param;import java.io.Serializable;import java.util.Objects;import com.baomidou.mybatisplus.core.toolkit.StringUtils;import com.aimilin.common.core.pojo.base.param.BaseParam;import lombok.Getter;import lombok.Setter;import lombok.ToString;import org.springframework.web.multipart.MultipartFile;import javax.validation.constraints.NotNull;/** * 大文件断点续传 * * @version V1.0 * @date 2022/5/24 10:52 */@Getter@Setter@ToStringpublic class SysPartFileParam extends BaseParam implements Serializable { /** * 文件上传Id, 前端传入的值 */ @NotNull(message = "uid不能为空", groups = {BaseParam.detail.class, BaseParam.add.class}) private String uid; /** * 上传文件名称 */ private String filename; /** * 当前文件块,从1开始 */ @NotNull(message = "partNumber不能为空", groups = {BaseParam.add.class}) private Integer partNumber; /** * 当前分块Md5 */ @NotNull(message = "partMd5不能为空", groups = {BaseParam.add.class}) private String partMd5; /** * 分块大小,根据 totalSize 和这个值你就可以计算出总共的块数。注意最后一块的大小可能会比这个要大。 */ @NotNull(message = "partSize不能为空", groups = {BaseParam.add.class}) private Long partSize; /** * 总大小 */ @NotNull(message = "totalSize不能为空", groups = {BaseParam.add.class}) private Long totalSize; /** * 文件标识,MD5指纹 */ @NotNull(message = "fileMd5不能为空", groups = {BaseParam.add.class}) private String fileMd5; /** * 二进制文件 */ @NotNull(message = "file不能为空", groups = {BaseParam.add.class}) private MultipartFile file; /** * 总块数, (int)totalSize / partSize 最后一个模块要大一点; * * @return 结果 */ public Integer getTotalParts() { if (Objects.isNull(totalSize) Objects.isNull(partSize)) { return 0; } return new Double(Math.ceil(totalSize * 1.0 / partSize)).intValue(); } public String getFilename() { if (StringUtils.isBlank(this.filename) && Objects.isNull(this.file)) { return null; } return StringUtils.isBlank(this.filename) ? this.file.getOriginalFilename() : this.filename; }}
至于代码中的 BaseParam 类,只是定义了一些验证的分组,类似以下代码:
/** * 参数校验分组:分页 */ public @interface page { } /** * 参数校验分组:列表 */ public @interface list { } /** * 参数校验分组:下拉 */ public @interface dropDown { } /** * 参数校验分组:增加 */ public @interface add { }
大文件分片上传服务类实现
也是定义了三个接口,分片上传、查询当前已上传的分片、取消文件上传
package com.aimilin.component.system.service.modular.file.service;import com.aimilin.component.system.service.modular.file.param.SysPartFileParam;import com.aimilin.component.system.service.modular.file.result.SysPartFileResult;/** * 块文件上传 * * @version V1.0 * @date 2022/5/24 10:59 */public interface SysPartFileService { /** * 文件块上传公共前缀 */ public static final String PART_FILE_KEY = "PART_FILE"; /** * 文件块上传 * 1. 将上传文件按照partSize拆分成多个文件块 * 2. 判断当前文件块是否已经上传 * 3. 未上传,则上传当前文本块 * 4. 已上传则不处理 * 5. 统计当前文本块上传进度信息 * 6. 判断所有文本块是否已经上传完成,如果上传完成则触发文件合并 */ public SysPartFileResult partUpload(SysPartFileParam partFile); /** * 获取文件上传状态 * * @param partFile 上传文件信息 * @return 文件上传状态结果 */ public SysPartFileResult getPartUploadStatus(SysPartFileParam partFile); /** * 取消文件上传 * * @param partFile 上传文件信息 * @return 文件上传状态结果 */ public SysPartFileResult cancelUpload(SysPartFileParam partFile);}
服务实现类:
package com.aimilin.component.system.service.modular.file.service.impl;import cn.hutool.core.io.FileUtil;import com.baomidou.mybatisplus.core.toolkit.IdWorker;import com.aimilin.common.base.file.FilePartOperator;import com.aimilin.common.base.file.param.AbortMultipartUploadResult;import com.aimilin.common.base.file.param.CompleteFileUploadPart;import com.aimilin.common.base.file.param.FileUploadPart;import com.aimilin.common.base.file.param.FileUploadPartResult;import com.aimilin.common.cache.RedisService;import com.aimilin.common.core.consts.CommonConstant;import com.aimilin.common.core.context.login.LoginContextHolder;import com.aimilin.common.core.exception.ServiceException;import com.aimilin.component.system.service.modular.file.convert.SysPartFileConvert;import com.aimilin.component.system.service.modular.file.entity.SysFileInfo;import com.aimilin.component.system.service.modular.file.enums.SysFileInfoExceptionEnum;import com.aimilin.component.system.service.modular.file.enums.SysPartFileEnum;import com.aimilin.component.system.service.modular.file.param.SysPartFileParam;import com.aimilin.component.system.service.modular.file.result.SysPartFileCache;import com.aimilin.component.system.service.modular.file.result.SysPartFileCache.FileInfo;import com.aimilin.component.system.service.modular.file.result.SysPartFileCache.SysFilePart;import com.aimilin.component.system.service.modular.file.result.SysPartFileResult;import com.aimilin.component.system.service.modular.file.service.SysFileInfoService;import com.aimilin.component.system.service.modular.file.service.SysPartFileService;import lombok.extern.slf4j.Slf4j;import org.apache.commons.io.FilenameUtils;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.stereotype.Service;import org.springframework.web.multipart.MultipartFile;import javax.annotation.Resource;import java.io.IOException;import java.util.*;import java.util.concurrent.TimeUnit;import static com.aimilin.component.system.service.config.FileConfig.DEFAULT_BUCKET;/** * 大文件上传功能服务实现 * * @version V1.0 * @date 2022/5/24 11:53 */@Slf4j@Servicepublic class SysPartFileServiceImpl implements SysPartFileService { @Resource private FilePartOperator fileOperator; @Resource private RedisService redisService; @Resource private SysFileInfoService sysFileInfoService; @Resource private RedissonClient redisson; /** * 文件块上传 * 1. 将上传文件按照partSize拆分成多个文件块 * 2. 判断当前文件块是否已经上传 * 3. 未上传,则上传当前文本块 * 4. 已上传则不处理 * 5. 统计当前文本块上传进度信息 * 6. 判断所有文本块是否已经上传完成,如果上传完成则触发文件合并 * * @param partFile 上传文件 * @return SysPartFileResult 文件上传结果 */ @Override public SysPartFileResult partUpload(SysPartFileParam partFile) { MultipartFile file = partFile.getFile(); log.info("分块上传文件:{}, partNumber:{}/{}, partSize:{}/{}", partFile.getFilename(), partFile.getPartNumber(), partFile.getTotalParts(), file.getSize(), partFile.getPartSize()); SysPartFileResult partUploadStatus = this.getPartUploadStatus(partFile); // 已经上传该部分则直接返回当前文件状态 if (SysPartFileEnum.SUCCESS.getCode().equals(partUploadStatus.getPartState())) { return partUploadStatus; } // 上传分片文件 FileUploadPart fileUploadPart = this.getFileUploadPart(partFile); try { FileUploadPartResult uploadPartResult = fileOperator.uploadPart(fileUploadPart); this.setPartUploadStatus(partFile, uploadPartResult); } catch (Exception e) { log.error("文件分片上传失败,请求:{}:{}", partFile, e.getMessage(), e); throw new ServiceException(SysFileInfoExceptionEnum.FILE_OSS_ERROR); } return this.getPartUploadStatus(partFile); } /** * 获取文件上传状态 * * @param partFile 上传文件信息 * @return 文件上传状态结果 */ @Override public SysPartFileResult getPartUploadStatus(SysPartFileParam partFile) { SysPartFileCache fileCache = redisService.getCacheObject(getPartFileKey(partFile.getUid())); SysPartFileResult result; // 如果没有上传过则返回默认值 if (Objects.isNull(fileCache)) { result = SysPartFileConvert.INSTANCE.toSysPartFileResult(partFile); result.setFileState(SysPartFileEnum.NOT_EXISTS.getCode()); result.setPartState(SysPartFileEnum.NOT_EXISTS.getCode()); } else { result = SysPartFileConvert.INSTANCE.toSysPartFileResult(fileCache, fileCache.getFilePart(partFile.getPartNumber())); } return result; } /** * 取消文件上传 * * @param partFile 上传文件信息 * @return 文件上传状态结果 */ @Override public SysPartFileResult cancelUpload(SysPartFileParam partFile) { String cacheKey = getPartFileKey(partFile.getUid()); SysPartFileCache fileCache = redisService.getCacheObject(cacheKey); if (Objects.isNull(fileCache)) { throw new ServiceException(SysFileInfoExceptionEnum.NOT_EXISTED_FILE); } SysPartFileCache.FileInfo fileInfo = fileCache.getFileInfo(); fileOperator.abortMultipartUpload(fileInfo.getBucketName(), fileInfo.getObjectName(), fileInfo.getUploadId()); log.info("取消文件上传:{}", partFile.getUid()); SysPartFileResult sysPartFileResult = SysPartFileConvert.INSTANCE.toSysPartFileResult(partFile); sysPartFileResult.setFileState(SysPartFileEnum.CANCELED.getCode()); redisService.deleteObject(cacheKey); return sysPartFileResult; } /** * 文件分片上传,设置文件分片信息 * * @param partFile 分片文件参数 * @param uploadPartResult 文件上传结果信息 */ private void setPartUploadStatus(SysPartFileParam partFile, FileUploadPartResult uploadPartResult) { String redisKey = getPartFileKey(partFile.getUid()); if (!redisService.hasKey(redisKey)) { throw new ServiceException(SysFileInfoExceptionEnum.FILE_CACHE_ERROR); } RLock lock = redisson.getLock(CommonConstant.getLockKey(redisKey)); try { lock.lock(); SysPartFileCache fileCache = redisService.getCacheObject(redisKey); Set<SysFilePart> filePartList = fileCache.getFilePartList(); if (Objects.isNull(filePartList)) { filePartList = new HashSet<>(); fileCache.setFilePartList(filePartList); } SysFilePart sysFilePart = new SysFilePart(); sysFilePart.setPartNumber(partFile.getPartNumber()); sysFilePart.setPartState(SysPartFileEnum.SUCCESS.getCode()); sysFilePart.setPartMd5(partFile.getPartMd5()); sysFilePart.setPartSize(partFile.getFile().getSize()); sysFilePart.setFileUploadPartResult(uploadPartResult); filePartList.add(sysFilePart); fileCache.setFileState(SysPartFileEnum.UPLOADING.getCode()); // 所有文本块都已经上传完成 if (new HashSet<>(fileCache.getUploadedParts()).size() == fileCache.getTotalParts()) { CompleteFileUploadPart completeFileUploadPart = SysPartFileConvert.INSTANCE.toCompleteFileUploadPart(fileCache); fileOperator.completeMultipartUpload(completeFileUploadPart); log.info("文件合并完成:{},part: {}/{}", partFile.getFilename(), partFile.getPartNumber(), partFile.getTotalParts()); this.saveFileInfo(partFile, fileCache); fileCache.setFileState(SysPartFileEnum.SUCCESS.getCode()); redisService.setCacheObject(redisKey, fileCache, 1L, TimeUnit.DAYS); } else { redisService.setCacheObject(redisKey, fileCache); } } catch (Exception e) { log.error("设置文件分片上传状态异常,{},上传结果:{}", partFile, uploadPartResult, e); throw new ServiceException(SysFileInfoExceptionEnum.PART_FILE_SET_STATE_ERROR); }finally { lock.unlock(); } } /** * 保存文件信息到 数据库 * * @param partFile 分片文件 * @param fileCache 文件缓存对象 */ private void saveFileInfo(SysPartFileParam partFile, SysPartFileCache fileCache) { SysFileInfo sysFileInfo = new SysFileInfo(); sysFileInfo.setId(Objects.isNull(fileCache.getFileId()) ? IdWorker.getId() : fileCache.getFileId()); sysFileInfo.setFileLocation(fileOperator.getFileLocation().getCode()); sysFileInfo.setFileBucket(fileCache.getFileInfo().getBucketName()); sysFileInfo.setFileOriginName(fileCache.getFilename()); sysFileInfo.setFileSuffix(FilenameUtils.getExtension(fileCache.getFileInfo().getObjectName())); sysFileInfo.setFileSizeKb(SysFileUtils.getFileSizeKb(fileCache.getTotalSize())); sysFileInfo.setFileSizeInfo(FileUtil.readableFileSize(fileCache.getTotalSize())); sysFileInfo.setFileObjectName(fileCache.getFileInfo().getObjectName()); boolean save = sysFileInfoService.save(sysFileInfo); log.info("保存文件信息完成:{},结果:{}", partFile.getFilename(), save); } /** * 获取文件上传分片信息 * * @param partFile 分片文件参数 * @return 需要上传的分片文件信息 */ private FileUploadPart getFileUploadPart(SysPartFileParam partFile) { try { SysPartFileCache fileCache = redisService.getCacheObject(getPartFileKey(partFile.getUid())); if (Objects.isNull(fileCache)) { fileCache = this.initSysPartFileCache(partFile); } return SysPartFileConvert.INSTANCE.toFileUploadPart(fileCache.getFileInfo(), partFile); } catch (IOException e) { log.error("获取文件分片对象异常:{}", e.getMessage(), e); throw new ServiceException(SysFileInfoExceptionEnum.FILE_STREAM_ERROR); } } /** * 初始化文件缓存对象,进入该方法说明缓存为空 * @param partFile 分片文件 */ private SysPartFileCache initSysPartFileCache(SysPartFileParam partFile) { String key = getPartFileKey(partFile.getUid()); RLock lock = redisson.getLock(CommonConstant.getLockKey(key)); try { lock.lock(); SysPartFileCache fileCache = redisService.getCacheObject(key); if(Objects.isNull(fileCache)){ Long fileId = IdWorker.getId(); String objectName = SysFileUtils.getFileObjectName(partFile.getFilename(), fileId); String uploadId = fileOperator.initiateMultipartUpload(DEFAULT_BUCKET, objectName); fileCache = SysPartFileConvert.INSTANCE.toSysPartFileCache(partFile); fileCache.setFileState(SysPartFileEnum.UPLOADING.getCode()); fileCache.setFileInfo(new FileInfo(DEFAULT_BUCKET, objectName, uploadId)); fileCache.setFileId(fileId); redisService.setCacheObject(getPartFileKey(partFile.getUid()), fileCache); } return fileCache; } catch (Exception e) { log.error("文件缓存初始化异常:{}", partFile, e); throw new ServiceException(SysFileInfoExceptionEnum.PART_FILE_INIT_CACHE_ERROR); }finally { lock.unlock(); } } /** * 获取文件缓存key * * @param fileId 文件Id * @return %s:%s:%s */ private String getPartFileKey(String fileId) { return String.format("%s:%s:%s", PART_FILE_KEY, LoginContextHolder.me().getSysLoginUserId(), fileId); }}
文件分片上传定义公共服务类接口
package com.aimilin.common.base.file;import com.aimilin.common.base.file.param.*;/** * 大文件分片操作服务类 * * @version V1.0 * @date 2022/5/24 16:56 */public interface FilePartOperator extends FileOperator { /** * 初始化分片文件上传 * * @param bucketName 文件桶 * @param key 文件key * @return 本次文件上传唯一标识 */ String initiateMultipartUpload(String bucketName, String key); /** * 上传分片文件 * * @param fileUploadPart 分片文件参数 * @return 上传结果 */ FileUploadPartResult uploadPart(FileUploadPart fileUploadPart); /** * 完成分片上传 * * @param completeFileUploadPart 请求对象 * @return 结果信息 */ CompleteFileUploadPartResult completeMultipartUpload(CompleteFileUploadPart completeFileUploadPart); /** * 取消文件分片上传 * * @param bucketName 文件桶 * @param objectName 对象key * @param uploadId 上传ID * @return */ void abortMultipartUpload(String bucketName, String objectName, String uploadId);}/** * 文件分片上传取消 * * @version V1.0 * @date 2022/5/24 20:32 */public class AbortMultipartUploadResult {}/** * 完成分片上传 * * @version V1.0 * @date 2022/5/24 20:07 */@Getter@Setter@ToStringpublic class CompleteFileUploadPart implements Serializable { private String bucketName; private String objectName; private String uploadId; private List<FileUploadPartResult> partETags;}/** * 分片上传结果 * * @version V1.0 * @date 2022/5/24 20:08 */@Getter@Setter@ToStringpublic class CompleteFileUploadPartResult implements Serializable { private String bucketName; private String objectName; private String location; private String eTag;}/** * 文件分片上传请求参数 * * @version V1.0 * @date 2022/5/24 17:00 */@Getter@Setter@ToStringpublic class FileUploadPart implements Serializable { /** * 文件桶 */ private String bucketName; /** * 文件key */ private String objectName; /** * 文件上传ID */ private String uploadId; /** * 分片大小,设置分片大小。除了最后一个分片没有大小限制,其他的分片最小为100 KB */ private Long partSize; /** * 设置分片号。每一个上传的分片都有一个分片号,取值范围是1~10000,如果超出此范围,OSS将返回InvalidArgument错误码。 */ private Integer partNumber; /** * 分片Md5签名 */ private String partMd5; /** * 分片文件内容 */ @JsonIgnore @JSONField(deserialize = false, serialize = false) private InputStream partContent;}/** * 文件分片上传结果 * * @version V1.0 * @date 2022/5/24 17:01 */@Getter@Setter@ToStringpublic class FileUploadPartResult implements Serializable { /** * 分块编号 */ private Integer partNumber; /** * 当前分片大小 */ private Long partSize; /** * 上传结果tag */ private String partETag;}
文件分片上传文件操作接口实现类
这里风两种实现,1:本地文件上传,2:oss对象存储方式分片上传
/** * 本地文件上传操作 * */@Slf4jpublic class LocalFileOperator implements FilePartOperator { @Override public FileLocationEnum getFileLocation() { return FileLocationEnum.LOCAL; } private final LocalFileProperties localFileProperties; private String currentSavePath = ""; private Dict localClient; public LocalFileOperator(LocalFileProperties localFileProperties) { this.localFileProperties = localFileProperties; initClient(); } @Override public void initClient() { if (SystemUtil.getOsInfo().isWindows()) { String savePathWindows = localFileProperties.getLocalFileSavePathWin(); if (!FileUtil.exist(savePathWindows)) { FileUtil.mkdir(savePathWindows); } currentSavePath = savePathWindows; } else { String savePathLinux = localFileProperties.getLocalFileSavePathLinux(); if (!FileUtil.exist(savePathLinux)) { FileUtil.mkdir(savePathLinux); } currentSavePath = savePathLinux; } localClient = Dict.create(); localClient.put("currentSavePath", currentSavePath); localClient.put("localFileProperties", localFileProperties); } @Override public void destroyClient() { // empty } @Override public Object getClient() { // empty return localClient; } @Override public boolean doesBucketExist(String bucketName) { String absolutePath = currentSavePath + File.separator + bucketName; return FileUtil.exist(absolutePath); } @Override public void setBucketAcl(String bucketName, BucketAuthEnum bucketAuthEnum) { // empty } @Override public boolean isExistingFile(String bucketName, String key) { return FileUtil.exist(this.getAbsolutePath(bucketName, key)); } @Override public void storageFile(String bucketName, String key, byte[] bytes) { // 判断bucket存在不存在 String bucketPath = currentSavePath + File.separator + bucketName; if (!FileUtil.exist(bucketPath)) { FileUtil.mkdir(bucketPath); } // 存储文件 FileUtil.writeBytes(bytes, this.getAbsolutePath(bucketName, key)); } @Override public void storageFile(String bucketName, String key, InputStream inputStream, long fileSize) { // 判断bucket存在不存在 String bucketPath = currentSavePath + File.separator&am
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。