package tech.powerjob.server.persistence.storage.impl; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.*; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Conditional; import org.springframework.core.env.Environment; import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition; import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.storage.AbstractDFsService; import javax.annotation.Priority; import java.util.List; import java.util.Map; import java.util.Optional; /** * MINIO support * High Performance Object Storage * 配置项: * oms.storage.dfs.minio.endpoint * oms.storage.dfs.minio.bucketName * oms.storage.dfs.minio.accessKey * oms.storage.dfs.minio.secretKey * * @author xinyi * @since 2023/8/21 */ @Slf4j @Priority(value = Integer.MAX_VALUE - 3) @Conditional(MinioOssService.MinioOssCondition.class) public class MinioOssService extends AbstractDFsService { private static final String TYPE_MINIO = "minio"; private static final String KEY_ENDPOINT = "endpoint"; private static final String KEY_BUCKET_NAME = "bucketName"; private static final String ACCESS_KEY = "accessKey"; private static final String SECRET_KEY = "secretKey"; private AmazonS3 amazonS3; private String bucket; private static final String NOT_FOUNT = "404 Not Found"; @Override public void store(StoreRequest storeRequest) { try { String fileName = parseFileName(storeRequest.getFileLocation()); // 创建 PutObjectRequest 对象 PutObjectRequest request = new PutObjectRequest(this.bucket, fileName, storeRequest.getLocalFile()); amazonS3.putObject(request); } catch (Throwable t) { ExceptionUtils.rethrow(t); } } @Override public void download(DownloadRequest downloadRequest) { try { FileUtils.forceMkdirParent(downloadRequest.getTarget()); String fileName = parseFileName(downloadRequest.getFileLocation()); GetObjectRequest getObjectRequest = new GetObjectRequest(this.bucket, fileName); amazonS3.getObject(getObjectRequest, downloadRequest.getTarget()); } catch (Throwable t) { ExceptionUtils.rethrow(t); } } /** * 获取文件元 * * @param fileLocation 文件位置 */ @Override public Optional fetchFileMeta(FileLocation fileLocation) { try { String fileName = parseFileName(fileLocation); ObjectMetadata objectMetadata = amazonS3.getObjectMetadata(this.bucket, fileName); return Optional.ofNullable(objectMetadata).map(minioStat -> { Map metaInfo = Maps.newHashMap(); if (objectMetadata.getRawMetadata() != null) { metaInfo.putAll(objectMetadata.getRawMetadata()); } return new FileMeta() .setLastModifiedTime(objectMetadata.getLastModified()) .setLength(objectMetadata.getContentLength()) .setMetaInfo(metaInfo); }); } catch (AmazonS3Exception s3Exception) { String errorCode = s3Exception.getErrorCode(); if (NOT_FOUNT.equalsIgnoreCase(errorCode)) { return Optional.empty(); } } catch (Exception oe) { ExceptionUtils.rethrow(oe); } return Optional.empty(); } private static String parseFileName(FileLocation fileLocation) { return String.format("%s/%s", fileLocation.getBucket(), fileLocation.getName()); } /** * 清理过期文件 * * @param bucket 桶名 * @param days 日期 */ @Override public void cleanExpiredFiles(String bucket, int days) { /* 使用Minio的管理界面或Minio客户端命令行工具设置对象的生命周期规则。在生命周期规则中定义文件的过期时间。Minio将自动根据设置的规则删除过期的文件。 */ } /** * 释放连接 */ @Override public void destroy() { //minioClient.close(); } /** * 初始化minio * * @param applicationContext / */ @Override protected void init(ApplicationContext applicationContext) { Environment environment = applicationContext.getEnvironment(); String endpoint = fetchProperty(environment, TYPE_MINIO, KEY_ENDPOINT); String bucketName = fetchProperty(environment, TYPE_MINIO, KEY_BUCKET_NAME); String accessKey = fetchProperty(environment, TYPE_MINIO, ACCESS_KEY); String secretKey = fetchProperty(environment, TYPE_MINIO, SECRET_KEY); try { initOssClient(endpoint, bucketName, accessKey, secretKey); } catch (Exception e) { ExceptionUtils.rethrow(e); } } /** * 创建minio连接并且创建桶 * * @param endpoint 端口 * @param bucketName 桶名 * @param accessKey 访问密钥 * @param secretKey 秘密密钥 */ public void initOssClient(String endpoint, String bucketName, String accessKey, String secretKey) { log.info("[Minio] init OSS by config: endpoint={}, bucketName={}, accessKey={}, secretKey={}", endpoint, bucketName, accessKey, secretKey); if (StringUtils.isEmpty(bucketName)) { throw new IllegalArgumentException("'oms.storage.dfs.minio.bucketName' can't be empty, please creat a bucket in minio oss console then config it to powerjob"); } // 创建凭证对象 BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey); // 创建AmazonS3客户端并指定终端节点和凭证 this.amazonS3 = AmazonS3ClientBuilder.standard() // 当使用 AWS Java SDK 连接到非AWS服务(如MinIO)时,指定区域(Region)是必需的,即使这个区域对于你的MinIO实例并不真正适用。原因在于AWS SDK的客户端构建器需要一个区域来配置其服务端点,即使在连接到本地或第三方S3兼容服务时也是如此。使用 "us-east-1" 作为占位符是很常见的做法,因为它是AWS最常用的区域之一。这不会影响到实际的连接或数据传输,因为真正的服务地址是由你提供的终端节点URL决定的。如果你的代码主要是与MinIO交互,并且不涉及AWS服务,那么这个区域设置只是形式上的要求。 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, "us-east-1")) .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .withPathStyleAccessEnabled(true) // 重要:启用路径样式访问 .build(); this.bucket = bucketName; createBucket(bucketName); log.info("[Minio] initialize OSS successfully!"); } /** * 创建 bucket * * @param bucketName 桶名 */ @SneakyThrows(Exception.class) public void createBucket(String bucketName) { // 建议自行创建 bucket,设置好相关的策略 if (bucketExists(bucketName)) { return; } Bucket createBucketResult = amazonS3.createBucket(bucketName); log.info("[Minio] createBucket successfully, bucketName: {}, createResult: {}", bucketName, createBucketResult); String policy = "{\n" + " \"Version\": \"2012-10-17\",\n" + " \"Statement\": [\n" + " {\n" + " \"Action\": [\n" + " \"s3:GetObject\"\n" + " ],\n" + " \"Effect\": \"Allow\",\n" + " \"Principal\": {\n" + " \"AWS\": [\n" + " \"*\"\n" + " ]\n" + " },\n" + " \"Resource\": [\n" + " \"arn:aws:s3:::" + bucketName + "/*\"\n" + " ]\n" + " }\n" + " ]\n" + "}"; try { amazonS3.setBucketPolicy(bucketName, policy); } catch (Exception e) { log.warn("[Minio] setBucketPolicy failed, maybe you need to setBucketPolicy by yourself!", e); } } /** * 判断 bucket是否存在 * * @param bucketName: 桶名 * @return boolean */ @SneakyThrows(Exception.class) public boolean bucketExists(String bucketName) { return amazonS3.doesBucketExistV2(bucketName); } public static class MinioOssCondition extends PropertyAndOneBeanCondition { @Override protected List anyConfigKey() { return Lists.newArrayList("oms.storage.dfs.minio.endpoint"); } @Override protected Class beanType() { return DFsService.class; } } }