package tech.powerjob.server.core.scheduler; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.extension.dfs.DFsService; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import java.io.File; import java.util.Date; /** * CCO(Chief Clean Officer) * * @author tjq * @since 2020/5/18 */ @Slf4j @Service public class CleanService { private final DFsService dFsService; private final InstanceInfoRepository instanceInfoRepository; private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private final WorkflowNodeInfoRepository workflowNodeInfoRepository; private final LockService lockService; private final int instanceInfoRetentionDay; private final int localContainerRetentionDay; private final int remoteContainerRetentionDay; private static final int TEMPORARY_RETENTION_DAY = 3; /** * 每天凌晨3点定时清理 */ private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; public CleanService(DFsService dFsService, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService, @Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay, @Value("${oms.container.retention.local}") int localContainerRetentionDay, @Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) { this.dFsService = dFsService; this.instanceInfoRepository = instanceInfoRepository; this.workflowInstanceInfoRepository = workflowInstanceInfoRepository; this.workflowNodeInfoRepository = workflowNodeInfoRepository; this.lockService = lockService; this.instanceInfoRetentionDay = instanceInfoRetentionDay; this.localContainerRetentionDay = localContainerRetentionDay; this.remoteContainerRetentionDay = remoteContainerRetentionDay; } @Async(PJThreadPool.TIMING_POOL) @Scheduled(cron = CLEAN_TIME_EXPRESSION) public void timingClean() { // 释放本地缓存 WorkerClusterManagerService.cleanUp(); // 释放磁盘空间 cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); // 删除数据库历史的数据 cleanByOneServer(); } /** * 只能一台server清理的操作统一到这里执行 */ private void cleanByOneServer() { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L); if (!lock) { log.info("[CleanService] clean job is already running, just return."); return; } try { // 删除数据库运行记录 cleanInstanceLog(); cleanWorkflowInstanceLog(); // 删除无用节点 cleanWorkflowNodeInfo(); // 删除 GridFS 过期文件 cleanRemote(Constants.LOG_BUCKET, instanceInfoRetentionDay); cleanRemote(Constants.CONTAINER_BUCKET, remoteContainerRetentionDay); } finally { lockService.unlock(HISTORY_DELETE_LOCK); } } @VisibleForTesting public void cleanLocal(String path, int day) { if (day < 0) { log.info("[CleanService] won't clean up {} because of offset day <= 0.", path); return; } Stopwatch stopwatch = Stopwatch.createStarted(); File dir = new File(path); if (!dir.exists()) { return; } File[] logFiles = dir.listFiles(); if (logFiles == null || logFiles.length == 0) { return; } // 计算最大偏移量 long maxOffset = day * 24 * 60 * 60 * 1000L; for (File f : logFiles) { long offset = System.currentTimeMillis() - f.lastModified(); if (offset >= maxOffset) { if (!f.delete()) { log.warn("[CleanService] delete file({}) failed.", f.getName()); }else { log.info("[CleanService] delete file({}) successfully.", f.getName()); } } } log.info("[CleanService] clean {} successfully, using {}.", path, stopwatch.stop()); } @VisibleForTesting public void cleanRemote(String bucketName, int day) { if (day < 0) { log.info("[CleanService] won't clean up bucket({}) because of offset day <= 0.", bucketName); return; } Stopwatch stopwatch = Stopwatch.createStarted(); try { dFsService.cleanExpiredFiles(bucketName, day); }catch (Exception e) { log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); } log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); } @VisibleForTesting public void cleanInstanceLog() { if (instanceInfoRetentionDay < 0) { return; } try { Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.FINISHED_STATUS); log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t); }catch (Exception e) { log.warn("[CleanService] clean instanceInfo failed.", e); } } @VisibleForTesting public void cleanWorkflowInstanceLog() { if (instanceInfoRetentionDay < 0) { return; } try { Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay); int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.FINISHED_STATUS); log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t); }catch (Exception e) { log.warn("[CleanService] clean workflow instanceInfo failed.", e); } } @VisibleForTesting public void cleanWorkflowNodeInfo(){ try { // 清理一天前创建的,且没有工作流 ID 的节点信息 Date t = DateUtils.addDays(new Date(), -1); int num = workflowNodeInfoRepository.deleteAllByWorkflowIdIsNullAndGmtCreateBefore(t); log.info("[CleanService] deleted {} node records whose create time before {} and workflowId is null.", num, t); } catch (Exception e) { log.warn("[CleanService] clean workflow node info failed.", e); } } }