16、Minio 教程 - Minio 分片上传文件putObject接口流程源码分析

前言

为了更好的理解和优化Minio文件上传,本篇文档对MInio中上传文件putObject接口源码分析以下。

基于Java 客户端 API

Controller层上传文件接口:

    @PostMapping("/upload")
    @ResponseBody
    public Object upload(MultipartFile file, String bucketName) throws IOException {
   
     
        return minioTemplate.putObject(file.getInputStream(), bucketName, file.getOriginalFilename());
    }

MinioTemplate接口:

            minioClient.putObject(
                    PutObjectArgs.builder().bucket(bucketName).object(uuidFileName).stream(
                            inputStream, inputStream.available(), -1)
                            .build());

源码分析

1. 进入Controller层接口

首先我在页面上上传了一个9M左右的文件:
 
文件上传,经过Tomcat服务器进行处理,然后到达我们的Controller层上传文件接口,我们使用的是MultipartFile 对象来接受文件,可以看到当前MultipartFile 对象存放了文件相关信息,而此时实际的文件是由Tomcat存放在硬盘临时目录的。

MultipartFile实际的对象是StandardMultipartHttpServletRequest的实例,他包含了ApplicationPart对象,ApplicationPart包含了图片中的文件信息。
 

接收到对象后,调用的就是MinioTemplate,这里传入了各种参数:

minioTemplate.putObject(file.getInputStream(), bucketName, file.getOriginalFilename());

最重要的是传入了一个InputStream,调用的是MultipartFile 对象的getInputStream()方法。

getInputStream获取输入流,是调用ApplicationPart中的DiskFileItem对象的getInputStream()方法。这个方法会将临时文件,直接转为FileInputStream并返回。

    public InputStream getInputStream() throws IOException {
   
     
        if (!this.isInMemory()) {
   
     
        	// 直接将临时文件转为输入流
            return new FileInputStream(this.dfos.getFile());
        } else {
   
     
            if (this.cachedContent == null) {
   
     
                this.cachedContent = this.dfos.getData();
            }

            return new ByteArrayInputStream(this.cachedContent);
        }
    }

2. 构建参数对象PutObjectArgs(参数校验,分片)

InputStream获取到了以后,接着就是调用MinioTemplate中的putObject方法了。

            minioClient.putObject(
                    PutObjectArgs.builder().bucket(bucketName).object(uuidFileName).stream(
                            inputStream, inputStream.available(), -1)
                            .build());

putObject方法实际调用的就是 MinioClient的putObject,调用之前会创建PutObjectArgs参数对象,使用的是建造者模式。

PutObjectArgs首先会对存储桶名称进行校验,所以创建存储桶名称时,要格外注意。

        protected void validateBucketName(String name) {
   
     
        	// 非空校验
            this.validateNotNull(name, "bucket name");
            // 1. 校验长度,3-63之间
            if (name.length() >= 3 && name.length() <= 63) {
   
     
                String msg;
                // 2. 不能包含“..”
                if (name.contains("..")) {
   
     
                    msg = "bucket name cannot contain successive periods. For more information refer http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html";
                    throw new IllegalArgumentException(name + " : " + msg);
                    // 3. 正则校验
                } else if (!name.matches("^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$")) {
   
     
                    msg = "bucket name does not follow Amazon S3 standards. For more information refer http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html";
                    throw new IllegalArgumentException(name + " : " + msg);
                }
            } else {
   
     
                throw new IllegalArgumentException(name + " : bucket name must be at least 3 and no more than 63 characters long");
            }
        }

然后对对象名称进行校验:

        protected void validateObjectName(String name) {
   
     
        	// 1. 非空和Null校验
            this.validateNotEmptyString(name, "object name");
            String[] var2 = name.split("/"); // 按照反斜杠分割为字符串数组
            int var3 = var2.length;
			//  2. 循环字符串数组,校验每个斜杠分割的字段不能是“.”或者“..”
            for(int var4 = 0; var4 < var3; ++var4) {
   
     
                String token = var2[var4];
                if (token.equals(".") || token.equals("..")) {
   
     
                    throw new IllegalArgumentException("object name with '.' or '..' path segment is not supported");
                }
            }
        }

最后对InputStream进行构建。

        public PutObjectArgs.Builder stream(InputStream stream, long objectSize, long partSize) {
   
     			
            // 1. 非空
            this.validateNotNull(stream, "stream");
            // 2. 获取分片数,5M分割
            long[] partinfo = this.getPartInfo(objectSize, partSize);
            long pSize = partinfo[0]; // 分片大小 5M=5242880字节
            int pCount = (int)partinfo[1]; // 分片数,这里上传的9m文件,所以有两片
            // 3. 将FileInputStream=》BufferedInputStream
            BufferedInputStream bis = stream instanceof BufferedInputStream ? (BufferedInputStream)stream : new BufferedInputStream(stream);
            // 4. 将这些参数添加到PutObjectArgs对象中
            return this.setStream(bis, objectSize, pSize, pCount);
        }

在构建InputStream时,会进行分片操作,我们可以了解到上传文件大小的一些限制:

  • 分片大小不能小于5MB,大于5GB
  • 对象大小不能超过5TiB
  • partSize传入-1,默认按照5MB进行分割
  • 分片数量不能超过10000

分片规则如下:

		// 参数为 文件大小objectSize、分片大小partSize,分片数我们传入的是-1,表示使用默认配置
        protected long[] getPartInfo(long objectSize, long partSize) {
   
     
        	// 1. 校验大小,如果设置的分片大小 小于5M或者大于5GB,报错不支持
        	//  对象大小超过5TiB,报错不支持
            this.validateSizes(objectSize, partSize);
            if (objectSize < 0L) {
   
     
                return new long[]{
   
     partSize, -1L};
            } else {
   
     
            	// 2. 没有设置分片数据大小,怎按照默认的5M进行分割
                if (partSize <= 0L) {
   
     
                    double dPartSize = Math.ceil((double)objectSize / 10000.0D);
                    dPartSize = Math.ceil(dPartSize / 5242880.0D) * 5242880.0D;
                    partSize = (long)dPartSize;
                }
                if (partSize > objectSize) {
   
     
                    partSize = objectSize;
                }

                long partCount = partSize > 0L ? (long)Math.ceil((double)objectSize / (double)partSize) : 1L;
                // 3. 分片数量不能超过10000
                if (partCount > 10000L) {
   
     
                    throw new IllegalArgumentException("object size " + objectSize + " and part size " + partSize + " make more than " + 10000 + "parts for upload");
                } else {
   
     
                	// 4. 返回一个数组,第一个值为分片数据大小,第二个为分片数量
                    return new long[]{
   
     partSize, partCount};
                }
            }
        }

最终构建的PutObjectArgs对象如下:

 

该对象包含了文件流、对象名、分片信息等重要数据。

3. 进入MinioClient(上传分片、合并)

接着进入到MinioClient的putObject方法:

    public ObjectWriteResponse putObject(PutObjectArgs args) throws ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, IOException, NoSuchAlgorithmException, ServerException, XmlParserException {
   
     
    	// 1. 检查参数是否为Null
        this.checkArgs(args);
        // 2. 校验是否开启了SSE加密,如果开启了SSE,而不是Https请求则报错
        args.validateSse(this.baseUrl);
        // 3. 执行上传文件
        return this.putObject(args, args.stream(), args.objectSize(), args.partSize(), args.partCount(), args.contentType());
    }

接着调用重载的putObject方法,会进行分块创建=》分块上传=》合并文件流程操作。

    protected ObjectWriteResponse putObject(PutObjectBaseArgs args, Object data, long objectSize, long partSize, int partCount, String contentType) throws ErrorResponseException, InsufficientDataException, InternalException, InvalidKeyException, InvalidResponseException, IOException, NoSuchAlgorithmException, ServerException, XmlParserException {
   
     
    	// 1.设置消息头
        Multimap<String, String> headers = this.newMultimap(args.extraHeaders());
        headers.putAll(args.genHeaders());
        // 2. 设置 Content-Type
        if (!headers.containsKey("Content-Type")) {
   
     
            headers.put("Content-Type", contentType);
        }
        String uploadId = null;
        Part[] parts = null;
        // 3. 创建块读取对象
        PartReader partReader = this.newPartReader(data, objectSize, partSize, partCount);
        if (partReader == null) {
   
     
            throw new IllegalArgumentException("data must be RandomAccessFile or InputStream");
        } else {
   
     
            try {
   
     
                while(true) {
   
     
                	// 4. 分块操作,并返回块对象
                    PartSource partSource = partReader.getPart(!this.baseUrl.isHttps());
                    if (partSource == null) {
   
     
                    	// 没有分片时,表示分片全部上传成功,执行合并文件操作。
                        return this.completeMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, parts, (Multimap)null, (Multimap)null);
                    }
					// 5. 如果对象只有一块,也就是5MB之内,执行上传
                    if (partReader.partCount() == 1) {
   
     
                        return this.putObject(args.bucket(), args.region(), args.object(), partSource, headers, args.extraQueryParams());
                    }
					
                    if (uploadId == null) {
   
     
                    	// 6. 执行分块上传请求,返回uploadId
                        CreateMultipartUploadResponse response = this.createMultipartUpload(args.bucket(), args.region(), args.object(), headers, args.extraQueryParams());
                        uploadId = response.result().uploadId();
                        parts = new Part[10000];
                    }

                    Map<String, String> ssecHeaders = null;
                    if (args.sse() != null && args.sse() instanceof ServerSideEncryptionCustomerKey) {
   
     
                        ssecHeaders = args.sse().headers();
                    }
					// 7. 根据创建的请求,正式执行上传分片的操作
                    int partNumber = partSource.partNumber();
                    UploadPartResponse response = this.uploadPart(args.bucket(), args.region(), args.object(), partSource, partNumber, uploadId, ssecHeaders != null ? Multimaps.forMap(ssecHeaders) : null, (Multimap)null);
                    String etag = response.etag();
                    parts[partNumber - 1] = new Part(partNumber, etag);
                }
            } catch (RuntimeException var18) {
   
     
                if (uploadId != null) {
   
     
                    this.abortMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, (Multimap)null, (Multimap)null);
                }

                throw var18;
            } catch (Exception var19) {
   
     
                if (uploadId != null) {
   
     
                    this.abortMultipartUpload(args.bucket(), args.region(), args.object(), uploadId, (Multimap)null, (Multimap)null);
                }

                throw var19;
            }
        }
    }

4. 创建分片

putObject方法首先会创建PartReader 块读取对象:

    private PartReader newPartReader(Object data, long objectSize, long partSize, int partCount) {
   
     		
    	// 1. 如果是RandomAccessFile(RandomAccessFile允许自由定义文件记录指针,
    	// RandomAccessFile可以不从开始的地方开始输出,因此RandomAccessFile可以向已存在的文件后追加内容。)
    	// 创建RandomAccessFile类型的PartReader 
        if (data instanceof RandomAccessFile) {
   
     
            return new PartReader((RandomAccessFile)data, objectSize, partSize, partCount);
        } else {
   
     
        	// 2. 创建不同输入流的PartReader 对象
            return data instanceof InputStream ? new PartReader((InputStream)data, objectSize, partSize, partCount) : null;
        }
    }

PartReader 包含了文件数据流及分片信息。
 
接着进入一个死循环,PartReader 会获取PartSource块对象:

    public PartSource getPart(boolean computeSha256) throws NoSuchAlgorithmException, IOException {
   
     
        if (this.partNumber == this.partCount) {
   
     
            return null;
        } else {
   
     
        	// 1. 获取分片,从第一个开始获取
            ++this.partNumber;
            MessageDigest md5 = MessageDigest.getInstance("MD5"); // MD5 加密对象
            MessageDigest sha256 = computeSha256 ? MessageDigest.getInstance("SHA-256") : null; // SHA-256加密对象
            long partSize = this.partSize; // 分片大小 5MB
            if (this.partNumber == this.partCount) {
   
      // 判断当前分片是不是最后一个分片
                partSize = this.objectSize - this.totalDataRead;
            }
			// 2. 使用算法读取分块数据
            long bytesRead = this.read(partSize, md5, sha256);
            this.totalDataRead += bytesRead;
            if (this.objectSize < 0L && this.eof) {
   
     
                this.partCount = this.partNumber;
            }
			// 3. 加密
            String md5Hash = Base64.getEncoder().encodeToString(md5.digest());
            String sha256Hash = null;
            if (computeSha256) {
   
     
                sha256Hash = BaseEncoding.base16().encode(sha256.digest()).toLowerCase(Locale.US);
            }
			// 4. 返回PartSource对象
            return this.file != null ? new PartSource(this.partNumber, this.file, bytesRead, md5Hash, sha256Hash) : new PartSource(this.partNumber, this.buffers, bytesRead, md5Hash, sha256Hash);
        }
    }

每个PartSource对象,就对应一个块对象,其中包含了块数据和加密返回的签名。
 

5. 创建分片请求(获取uploadId)

createMultipartUpload方法会创建分块请求,根据对象名和存储桶名去Minio获取上传当前对象的uploadId。

    protected CreateMultipartUploadResponse createMultipartUpload(String bucketName, String region, String objectName, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {
   
     
    	// 构建请求参数
        Multimap<String, String> queryParams = this.newMultimap(extraQueryParams);
        queryParams.put("uploads", "");
        Multimap<String, String> headersCopy = this.newMultimap(headers);
        if (!headersCopy.containsKey("Content-Type")) {
   
     
            headersCopy.put("Content-Type", "application/octet-stream");
        }
		// 执行HTTP请求
        Response response = this.execute(Method.POST, bucketName, objectName, this.getRegion(bucketName, region), this.httpHeaders(headersCopy), queryParams, (Object)null, 0);
        Throwable var9 = null;

        CreateMultipartUploadResponse var11;
        try {
   
     
        	// 解析返回结果
            InitiateMultipartUploadResult result = (InitiateMultipartUploadResult)Xml.unmarshal(InitiateMultipartUploadResult.class, response.body().charStream());
            var11 = new CreateMultipartUploadResponse(response.headers(), bucketName, region, objectName, result);
        }

uploadId在循环中使用的都是同一个,说明分片上传的时候都会使用同一个uploadId,最后合并同一个uploadId的文件。
 

6. 上传分片

获取到了uploadId以后,就会执行上传操作,调用uploadPart方法,uploadPart最终也是调用execute,可以看到该方法,是调用的OkHttpClient 去执行的。

    protected Response execute(Method method, String bucketName, String objectName, String region, Headers headers, Multimap<String, String> queryParamMap, Object body, int length) throws XmlParserException {
   
     
    	//......
        // 构建URL :http://127.0.0.1:9000/pearl-buckent/files/2021-10-26/d9e9d6fc-73fc-4323-b317-b8b26b6b6fe0_apache-maven-3.6.2-bin.zip?uploadId=70174335-85ec-47c6-acaf-afa12c8add48&partNumber=2
        HttpUrl url = this.buildUrl(method, bucketName, objectName, region, queryParamMap);
        // 省略构建其他对象
        // 调用 httpClient执行上传文件
        Response response = httpClient.newCall(request).execute();
        // 处理响应,异常处理等。
        ResponseBody responseBody;
        if (response.isSuccessful()) {
   
     
           
 					// 省略大量代码.....
            }
        }
    }

合并文件

分片的数据都上传后,进入到completeMultipartUpload方法,在这个方法执行之前,在Minio控制台是看不到上传对象的。
 
这个方法传入了文件对象名,uploadID等,
 
该方法最终也是执行的execute,使用httpclient去调用的Minio服务器合并分片,最后完成了分片上传操作。之后Tomcat回调,完成清理临时文件等操作,最后返回信息给前端,也对应了整个Servlet请求响应的整个流程。
 

简单流程图