package tech.powerjob.server.core.instance; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.CollectionUtils; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.extension.dfs.*; import tech.powerjob.server.persistence.StringPage; import tech.powerjob.server.persistence.local.LocalInstanceLogDO; import tech.powerjob.server.persistence.local.LocalInstanceLogRepository; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.storage.Constants; import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import java.io.*; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; /** * 任务实例运行时日志服务 * * @author tjq * @since 2020/4/27 */ @Slf4j @Service public class InstanceLogService { @Value("${server.port}") private int port; @Resource private InstanceMetadataService instanceMetadataService; @Resource private DFsService dFsService; /** * 本地数据库操作bean */ @Resource(name = "localTransactionTemplate") private TransactionTemplate localTransactionTemplate; @Resource private LocalInstanceLogRepository localInstanceLogRepository; /** * 本地维护了在线日志的任务实例ID */ private final Map instanceId2LastReportTime = Maps.newConcurrentMap(); @Resource(name = PJThreadPool.BACKGROUND_POOL) private AsyncTaskExecutor powerJobBackgroundPool; /** * 分段锁 */ private final SegmentLock segmentLock = new SegmentLock(8); /** * 格式化时间戳 */ private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS); /** * 每一个展示的行数 */ private static final int MAX_LINE_COUNT = 100; /** * 更新中的日志缓存时间 */ private static final long LOG_CACHE_TIME = 10000; /** * 提交日志记录,持久化到本地数据库中 * @param workerAddress 上报机器地址 * @param logs 任务实例运行时日志 */ @Async(value = PJThreadPool.LOCAL_DB_POOL) public void submitLogs(String workerAddress, List logs) { List logList = logs.stream().map(x -> { instanceId2LastReportTime.put(x.getInstanceId(), System.currentTimeMillis()); LocalInstanceLogDO y = new LocalInstanceLogDO(); BeanUtils.copyProperties(x, y); y.setWorkerAddress(workerAddress); return y; }).collect(Collectors.toList()); try { CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.saveAll(logList)); }catch (Exception e) { log.warn("[InstanceLogService] persistent instance logs failed, these logs will be dropped: {}.", logs, e); } } /** * 获取任务实例运行日志(默认存在本地数据,需要由生成完成请求的路由与转发) * @param appId appId,AOP 专用 * @param instanceId 任务实例ID * @param index 页码,从0开始 * @return 文本字符串 */ @DesignateServer public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) { try { Future fileFuture = prepareLogFile(instanceId); // 超时并不会打断正在执行的任务 File logFile = fileFuture.get(5, TimeUnit.SECONDS); // 分页展示数据 long lines = 0; StringBuilder sb = new StringBuilder(); String lineStr; long left = index * MAX_LINE_COUNT; long right = left + MAX_LINE_COUNT; try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) { while ((lineStr = lr.readLine()) != null) { // 指定范围内,读出 if (lines >= left && lines < right) { sb.append(lineStr).append(System.lineSeparator()); } ++lines; } }catch (Exception e) { log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT); return new StringPage(index, (long) totalPage, sb.toString()); }catch (TimeoutException te) { return StringPage.simple("log file is being prepared, please try again later."); }catch (Exception e) { log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e); return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e)); } } /** * 获取日志的下载链接 * @param appId AOP 专用 * @param instanceId 任务实例 ID * @return 下载链接 */ @DesignateServer public String fetchDownloadUrl(Long appId, Long instanceId) { String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId; log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url); return url; } /** * 下载全部的任务日志文件 * @param instanceId 任务实例ID * @return 日志文件 * @throws Exception 异常 */ public File downloadInstanceLog(long instanceId) throws Exception { Future fileFuture = prepareLogFile(instanceId); return fileFuture.get(1, TimeUnit.MINUTES); } /** * 异步准备日志文件 * @param instanceId 任务实例ID * @return 异步结果 */ private Future prepareLogFile(long instanceId) { return powerJobBackgroundPool.submit(() -> { // 在线日志还在不断更新,需要使用本地数据库中的数据 if (instanceId2LastReportTime.containsKey(instanceId)) { return genTemporaryLogFile(instanceId); } return genStableLogFile(instanceId); }); } /** * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ @Async(PJThreadPool.BACKGROUND_POOL) public void sync(Long instanceId) { Stopwatch sw = Stopwatch.createStarted(); try { // 先持久化到本地文件 File stableLogFile = genStableLogFile(instanceId); // 将文件推送到 MongoDB FileLocation dfsFL = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId)); try { dFsService.store(new StoreRequest().setLocalFile(stableLogFile).setFileLocation(dfsFL)); log.info("[InstanceLog-{}] push local instanceLogs to mongoDB succeed, using: {}.", instanceId, sw.stop()); }catch (Exception e) { log.warn("[InstanceLog-{}] push local instanceLogs to mongoDB failed.", instanceId, e); } }catch (Exception e) { log.warn("[InstanceLog-{}] sync local instanceLogs failed.", instanceId, e); } // 删除本地数据库数据 try { instanceId2LastReportTime.remove(instanceId); CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); log.info("[InstanceLog-{}] delete local instanceLog successfully.", instanceId); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } } private File genTemporaryLogFile(long instanceId) { String path = genLogFilePath(instanceId, false); int lockId = ("tpFileLock-" + instanceId).hashCode(); try { segmentLock.lockInterruptibleSafe(lockId); // Stream 需要在事务的包裹之下使用 return localTransactionTemplate.execute(status -> { File f = new File(path); // 如果文件存在且有效,则不再重新构建日志文件(这个判断也需要放在锁内,否则构建到一半的文件会被返回) if (f.exists() && (System.currentTimeMillis() - f.lastModified()) < LOG_CACHE_TIME) { return f; } try { // 创建父文件夹(文件在开流时自动会被创建) FileUtils.forceMkdirParent(f); // 重新构建文件 try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { stream2File(allLogStream, f); } return f; }catch (Exception e) { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f)); throw new RuntimeException(e); } }); }finally { segmentLock.unlock(lockId); } } private File genStableLogFile(long instanceId) { String path = genLogFilePath(instanceId, true); int lockId = ("stFileLock-" + instanceId).hashCode(); try { segmentLock.lockInterruptibleSafe(lockId); return localTransactionTemplate.execute(status -> { File f = new File(path); if (f.exists()) { return f; } try { // 创建父文件夹(文件在开流时自动会被创建) FileUtils.forceMkdirParent(f); // 本地存在数据,从本地持久化(对应 SYNC 的情况) if (instanceId2LastReportTime.containsKey(instanceId)) { try (Stream allLogStream = localInstanceLogRepository.findByInstanceIdOrderByLogTime(instanceId)) { stream2File(allLogStream, f); } }else { FileLocation dfl = new FileLocation().setBucket(Constants.LOG_BUCKET).setName(genMongoFileName(instanceId)); Optional dflMetaOpt = dFsService.fetchFileMeta(dfl); if (!dflMetaOpt.isPresent()) { OmsFileUtils.string2File("SYSTEM: There is no online log for this job instance.", f); return f; } dFsService.download(new DownloadRequest().setTarget(f).setFileLocation(dfl)); } return f; }catch (Exception e) { CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(f)); throw new RuntimeException(e); } }); }finally { segmentLock.unlock(lockId); } } /** * 将数据库中存储的日志流转化为磁盘日志文件 * @param stream 流 * @param logFile 目标日志文件 */ private void stream2File(Stream stream, File logFile) { try (FileWriter fw = new FileWriter(logFile); BufferedWriter bfw = new BufferedWriter(fw)) { stream.forEach(instanceLog -> { try { bfw.write(convertLog(instanceLog) + System.lineSeparator()); }catch (Exception ignore) { } }); }catch (IOException ie) { ExceptionUtils.rethrow(ie); } } /** * 拼接日志 -> 2020-04-29 22:07:10.059 [192.168.1.1:2777] INFO XXX * @param instanceLog 日志对象 * @return 字符串 */ private static String convertLog(LocalInstanceLogDO instanceLog) { return String.format("%s [%s] %s %s", DATE_FORMAT.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), LogLevel.genLogLevelString(instanceLog.getLogLevel()), instanceLog.getLogContent()); } @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 120000) public void timingCheck() { // 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); instanceId2LastReportTime.keySet().forEach(instanceId -> { try { JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId); if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { frequentInstanceIds.add(instanceId); } }catch (Exception ignore) { } }); if (!CollectionUtils.isEmpty(frequentInstanceIds)) { // 只保留最近10分钟的日志 long time = System.currentTimeMillis() - 10 * 60 * 1000; Lists.partition(frequentInstanceIds, 100).forEach(p -> { try { localInstanceLogRepository.deleteByInstanceIdInAndLogTimeLessThan(p, time); }catch (Exception e) { log.warn("[InstanceLogService] delete expired logs for instance: {} failed.", p, e); } }); } // 删除长时间未 REPORT 的日志(必要性考证中......) } private static String genLogFilePath(long instanceId, boolean stable) { if (stable) { return OmsFileUtils.genLogDirPath() + String.format("%d-stable.log", instanceId); }else { return OmsFileUtils.genLogDirPath() + String.format("%d-temporary.log", instanceId); } } private static String genMongoFileName(long instanceId) { return String.format("oms-%d.log", instanceId); } }