package tech.powerjob.server.persistence.storage.impl; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.mongodb.ConnectionString; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoDatabase; import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.gridfs.GridFSDownloadStream; import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.client.model.Filters; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.bson.conversions.Bson; import org.bson.types.ObjectId; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Conditional; import org.springframework.core.env.Environment; import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.storage.AbstractDFsService; import tech.powerjob.server.common.spring.condition.PropertyAndOneBeanCondition; import javax.annotation.Priority; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.file.Files; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; /** * 使用 MongoDB GridFS 作为底层存储 * 配置用法:oms.storage.dfs.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority * * @author tjq * @since 2023/7/28 */ @Slf4j @Priority(value = Integer.MAX_VALUE - 10) @Conditional(GridFsService.GridFsCondition.class) public class GridFsService extends AbstractDFsService { private MongoClient mongoClient; private MongoDatabase db; private final Map bucketCache = Maps.newConcurrentMap(); private static final String TYPE_MONGO = "mongodb"; private static final String KEY_URI = "uri"; private static final String SPRING_MONGO_DB_CONFIG_KEY = "spring.data.mongodb.uri"; @Override public void store(StoreRequest storeRequest) throws IOException { GridFSBucket bucket = getBucket(storeRequest.getFileLocation().getBucket()); try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))) { bucket.uploadFromStream(storeRequest.getFileLocation().getName(), bis); } } @Override public void download(DownloadRequest downloadRequest) throws IOException { GridFSBucket bucket = getBucket(downloadRequest.getFileLocation().getBucket()); FileUtils.forceMkdirParent(downloadRequest.getTarget()); try (GridFSDownloadStream gis = bucket.openDownloadStream(downloadRequest.getFileLocation().getName()); BufferedOutputStream bos = new BufferedOutputStream(Files.newOutputStream(downloadRequest.getTarget().toPath())) ) { byte[] buffer = new byte[1024]; int bytes = 0; while ((bytes = gis.read(buffer)) != -1) { bos.write(buffer, 0, bytes); } bos.flush(); } } @Override public Optional fetchFileMeta(FileLocation fileLocation) throws IOException { GridFSBucket bucket = getBucket(fileLocation.getBucket()); GridFSFindIterable files = bucket.find(Filters.eq("filename", fileLocation.getName())); GridFSFile first = files.first(); if (first == null) { return Optional.empty(); } return Optional.of(new FileMeta() .setLength(first.getLength()) .setLastModifiedTime(first.getUploadDate()) .setMetaInfo(first.getMetadata())); } @Override public void cleanExpiredFiles(String bucketName, int days) { Stopwatch sw = Stopwatch.createStarted(); Date date = DateUtils.addDays(new Date(), -days); GridFSBucket bucket = getBucket(bucketName); Bson filter = Filters.lt("uploadDate", date); // 循环删除性能很差?我猜你肯定没看过官方实现[狗头]:org.springframework.data.mongodb.gridfs.GridFsTemplate.delete bucket.find(filter).forEach(gridFSFile -> { ObjectId objectId = gridFSFile.getObjectId(); try { bucket.delete(objectId); log.info("[GridFsService] deleted {}#{}", bucketName, objectId); }catch (Exception e) { log.error("[GridFsService] deleted {}#{} failed.", bucketName, objectId, e); } }); log.info("[GridFsService] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop()); } private GridFSBucket getBucket(String bucketName) { return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName)); } private String parseMongoUri(Environment environment) { // 优先从新的规则读取 String uri = fetchProperty(environment, TYPE_MONGO, KEY_URI); if (StringUtils.isNotEmpty(uri)) { return uri; } // 兼容 4.3.3 前的逻辑,读取 SpringMongoDB 配置 return environment.getProperty(SPRING_MONGO_DB_CONFIG_KEY); } void initMongo(String uri) { log.info("[GridFsService] mongoDB uri: {}", uri); if (StringUtils.isEmpty(uri)) { log.warn("[GridFsService] uri is empty, GridFsService is off now!"); return; } ConnectionString connectionString = new ConnectionString(uri); mongoClient = MongoClients.create(connectionString); if (StringUtils.isEmpty(connectionString.getDatabase())) { log.warn("[GridFsService] can't find database info from uri, will use [powerjob] as default, please make sure you have created the database 'powerjob'"); } db = mongoClient.getDatabase(Optional.ofNullable(connectionString.getDatabase()).orElse("powerjob")); log.info("[GridFsService] initialize MongoDB and GridFS successfully, will use mongodb GridFs as storage layer."); } @Override public void destroy() throws Exception { mongoClient.close(); } @Override protected void init(ApplicationContext applicationContext) { String uri = parseMongoUri(applicationContext.getEnvironment()); initMongo(uri); log.info("[GridFsService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER."); } public static class GridFsCondition extends PropertyAndOneBeanCondition { @Override protected List anyConfigKey() { return Lists.newArrayList("spring.data.mongodb.uri", "oms.storage.dfs.mongodb.uri"); } @Override protected Class beanType() { return DFsService.class; } } }