package tech.powerjob.server.persistence.storage.impl;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.model.DownloadFileRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PutObjectRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.env.Environment;
import tech.powerjob.server.extension.dfs.*;
import tech.powerjob.server.persistence.storage.AbstractDFsService;
import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition;
import javax.annotation.Priority;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Alibaba OSS support
* 海量、安全、低成本、高可靠的云存储服务
* 配置项:
* oms.storage.dfs.alioss.endpoint
* oms.storage.dfs.alioss.bucket
* oms.storage.dfs.alioss.credential_type
* oms.storage.dfs.alioss.ak
* oms.storage.dfs.alioss.sk
* oms.storage.dfs.alioss.token
*
* @author tjq
* @since 2023/7/30
*/
@Slf4j
@Priority(value = Integer.MAX_VALUE - 1)
@Conditional(AliOssService.AliOssCondition.class)
public class AliOssService extends AbstractDFsService {
private static final String TYPE_ALI_OSS = "alioss";
private static final String KEY_ENDPOINT = "endpoint";
private static final String KEY_BUCKET = "bucket";
private static final String KEY_CREDENTIAL_TYPE = "credential_type";
private static final String KEY_AK = "ak";
private static final String KEY_SK = "sk";
private static final String KEY_TOKEN = "token";
private OSS oss;
private String bucket;
private static final int DOWNLOAD_PART_SIZE = 10240;
private static final String NO_SUCH_KEY = "NoSuchKey";
@Override
public void store(StoreRequest storeRequest) throws IOException {
ObjectMetadata objectMetadata = new ObjectMetadata();
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
oss.putObject(putObjectRequest);
}
@Override
public void download(DownloadRequest downloadRequest) throws IOException {
FileLocation dfl = downloadRequest.getFileLocation();
DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
try {
FileUtils.forceMkdirParent(downloadRequest.getTarget());
oss.downloadFile(downloadFileRequest);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
@Override
public Optional fetchFileMeta(FileLocation fileLocation) throws IOException {
try {
ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
return Optional.ofNullable(objectMetadata).map(ossM -> {
Map metaInfo = Maps.newHashMap();
metaInfo.putAll(ossM.getRawMetadata());
if (ossM.getUserMetadata() != null) {
metaInfo.putAll(ossM.getUserMetadata());
}
return new FileMeta()
.setLastModifiedTime(ossM.getLastModified())
.setLength(ossM.getContentLength())
.setMetaInfo(metaInfo);
});
} catch (OSSException oe) {
String errorCode = oe.getErrorCode();
if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
return Optional.empty();
}
ExceptionUtils.rethrow(oe);
}
return Optional.empty();
}
private static String parseFileName(FileLocation fileLocation) {
return String.format("%s/%s", fileLocation.getBucket(), fileLocation.getName());
}
void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
if (StringUtils.isEmpty(bucket)) {
throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
}
this.bucket = bucket;
CredentialsProvider credentialsProvider;
CredentialType credentialType = CredentialType.parse(mode);
switch (credentialType) {
case PWD:
credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
break;
case SYSTEM_PROPERTY:
credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
break;
default:
credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
}
this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");
}
@Override
public void cleanExpiredFiles(String bucket, int days) {
/*
阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
*/
}
@Override
public void destroy() throws Exception {
oss.shutdown();
}
@Override
protected void init(ApplicationContext applicationContext) {
Environment environment = applicationContext.getEnvironment();
String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
try {
initOssClient(endpoint, bkt, ct, ak, sk, token);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
}
@Getter
@AllArgsConstructor
enum CredentialType {
/**
* 从环境读取
*/
ENV("env"),
/**
* 系统配置
*/
SYSTEM_PROPERTY("sys"),
/**
* 从账号密码读取
*/
PWD("pwd")
;
private final String code;
/**
* parse credential type
* @param mode oms.storage.dfs.alioss.credential_type
* @return CredentialType
*/
public static CredentialType parse(String mode) {
for (CredentialType credentialType : values()) {
if (StringUtils.equalsIgnoreCase(credentialType.code, mode)) {
return credentialType;
}
}
return PWD;
}
}
public static class AliOssCondition extends PropertyAndOneBeanCondition {
@Override
protected List anyConfigKey() {
return Lists.newArrayList("oms.storage.dfs.alioss.endpoint");
}
@Override
protected Class> beanType() {
return DFsService.class;
}
}
}