package tech.powerjob.worker.core.tracker.task.heavy; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.AlarmConfig; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.LRUCache; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.persistence.TaskDO; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * 处理秒级任务(FIX_RATE/FIX_DELAY)的TaskTracker * FIX_RATE 直接由 ScheduledExecutorService 实现,精度高,推荐使用 * FIX_DELAY 会有几秒的延迟,精度不是很理想 * * @author tjq * @since 2020/4/8 */ @Slf4j public class FrequentTaskTracker extends HeavyTaskTracker { /** * 时间表达式类型 */ private TimeExpressionType timeExpressionType; private long timeParams; /** * 最大同时运行实例数 */ private int maxInstanceNum; /** * 总运行次数(正常情况不会出现锁竞争,直接用 Atomic 系列,锁竞争严重推荐 LongAdder) */ private AtomicLong triggerTimes; private AtomicLong succeedTimes; private AtomicLong failedTimes; /** * 任务发射器 */ private Launcher launcher; /** * 保存最近10个子任务的信息,供用户查询(user -> server -> worker 传递查询) */ private LRUCache recentSubInstanceInfo; /** * 保存运行中的任务 */ private Map subInstanceId2TimeHolder; private AlertManager alertManager; private static final int HISTORY_SIZE = 10; private static final String LAST_TASK_ID_PREFIX = "L"; private static final int MIN_INTERVAL = 50; protected FrequentTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { super(req, workerRuntime); } @Override protected void initTaskTracker(ServerScheduleJobReq req) { // 0. 初始化实例变量 timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); timeParams = Long.parseLong(req.getTimeExpression()); maxInstanceNum = req.getMaxInstanceNum(); triggerTimes = new AtomicLong(0); succeedTimes = new AtomicLong(0); failedTimes = new AtomicLong(0); recentSubInstanceInfo = new LRUCache<>(HISTORY_SIZE); subInstanceId2TimeHolder = Maps.newConcurrentMap(); // 1. 初始化定时调度线程池 String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d"; ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build(); this.scheduledPool = Executors.newScheduledThreadPool(4, factory); this.alertManager = constructAlertManager(req); // 2. 启动任务发射器 launcher = new Launcher(); if (timeExpressionType == TimeExpressionType.FIXED_RATE) { // 固定频率需要设置最小间隔 if (timeParams < MIN_INTERVAL) { throw new PowerJobException("time interval too small, please set the timeExpressionInfo >= 1000"); } scheduledPool.scheduleAtFixedRate(launcher, 1, timeParams, TimeUnit.MILLISECONDS); } else { scheduledPool.schedule(launcher, 0, TimeUnit.MILLISECONDS); } // 3. 启动任务分发器(事实上,秒级任务应该都是单机任务,且感觉不需要失败重试机制,那么 Dispatcher 的存在就有点浪费系统资源了...) scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS); // 4. 启动状态检查器 scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); // 5. 启动执行器动态检测装置 scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); } @Override public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) { InstanceDetail detail = new InstanceDetail(); // 填充基础信息 detail.setActualTriggerTime(createTime); detail.setStatus(InstanceStatus.RUNNING.getV()); detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress()); List history = Lists.newLinkedList(); recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> { InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail(); subDetail.setSubInstanceId(subId); subDetail.setStartTime(subInstanceInfo.getStartTime()); subDetail.setFinishedTime(subInstanceInfo.getFinishedTime()); subDetail.setResult(subInstanceInfo.getResult()); InstanceStatus status = InstanceStatus.of(subInstanceInfo.status); subDetail.setStatus(status.getV()); history.add(subDetail); }); // 按 subInstanceId 排序 issue#63 history.sort((o1, o2) -> (int) (o2.getSubInstanceId() - o1.getSubInstanceId())); detail.setSubInstanceDetails(history); return detail; } /** * 任务发射器(@Reference 饥荒->雪球发射器) */ private class Launcher implements Runnable { public void innerRun() { if (finished.get()) { return; } // 子任务实例ID Long subInstanceId = triggerTimes.incrementAndGet(); // 执行记录缓存(只做展示,因此可以放在前面) SubInstanceInfo subInstanceInfo = new SubInstanceInfo(); subInstanceInfo.status = TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue(); subInstanceInfo.startTime = System.currentTimeMillis(); recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); String myAddress = workerRuntime.getWorkerAddress(); String taskId = String.valueOf(subInstanceId); TaskDO newRootTask = new TaskDO(); newRootTask.setInstanceId(instanceId); newRootTask.setSubInstanceId(subInstanceId); newRootTask.setTaskId(taskId); newRootTask.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); newRootTask.setFailedCnt(0); // 根任务总是默认本机执行 newRootTask.setAddress(myAddress); newRootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); newRootTask.setCreatedTime(System.currentTimeMillis()); newRootTask.setLastModifiedTime(System.currentTimeMillis()); newRootTask.setLastReportTime(-1L); // 判断是否超出最大执行实例数 if (maxInstanceNum > 0) { if (timeExpressionType == TimeExpressionType.FIXED_RATE) { if (subInstanceId2TimeHolder.size() >= maxInstanceNum) { log.warn("[FQTaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId); processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE"); return; } } } // 必须先持久化,持久化成功才能 dispatch,否则会导致后续报错(因为DB中没有这个taskId对应的记录,会各种报错) if (!taskPersistenceService.batchSave(Lists.newArrayList(newRootTask))) { log.error("[FQTaskTracker-{}] Launcher create new root task failed.", instanceId); processFinishedSubInstance(subInstanceId, false, "LAUNCH_FAILED"); return; } // 生成记录信息(必须保证持久化成功才能生成该记录,否则会导致 LAUNCH_FAILED 错误) SubInstanceTimeHolder timeHolder = new SubInstanceTimeHolder(); timeHolder.startTime = System.currentTimeMillis(); subInstanceId2TimeHolder.put(subInstanceId, timeHolder); dispatchTask(newRootTask, myAddress); } @Override public void run() { try { innerRun(); } catch (Exception e) { log.error("[FQTaskTracker-{}] launch task failed.", instanceId, e); } } } /** * 检查各个SubInstance的完成情况 */ private class Checker implements Runnable { @Override public void run() { if (finished.get()) { return; } try { checkStatus(); reportStatus(); } catch (Exception e) { log.warn("[FQTaskTracker-{}] check and report status failed.", instanceId, e); } } private void checkStatus() { Stopwatch stopwatch = Stopwatch.createStarted(); // worker 挂掉的任务直接置为失败 List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { log.warn("[FQTaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, false)) { ptStatusHolder.remove(disconnectedPTs); log.warn("[FQTaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); } } ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS(); long nowTS = System.currentTimeMillis(); Iterator> iterator = subInstanceId2TimeHolder.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); Long subInstanceId = entry.getKey(); SubInstanceTimeHolder timeHolder = entry.getValue(); long executeTimeout = nowTS - timeHolder.startTime; // 超时(包含总运行时间超时和心跳包超时),直接判定为失败 if (executeTimeout > instanceTimeoutMS) { onFinished(subInstanceId, false, "RUNNING_TIMEOUT", iterator); continue; } // 查看执行情况 InstanceStatisticsHolder holder = getInstanceStatisticsHolder(subInstanceId); long finishedNum = holder.succeedNum + holder.failedNum; long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum; if (unfinishedNum == 0) { // 数据库中没有该 subInstanceId 的记录,说明任务发射器写入DB失败,直接视为执行失败,删除数据 if (finishedNum == 0) { onFinished(subInstanceId, false, "LAUNCH_FAILED", iterator); continue; } switch (executeType) { // STANDALONE 代表任务确实已经执行完毕了 case STANDALONE: // 查询数据库获取结果(STANDALONE每个SubInstance只会有一条Task记录) TaskResult resultTask = taskPersistenceService.getAllTaskResult(instanceId, subInstanceId).get(0); onFinished(subInstanceId, resultTask.isSuccess(), resultTask.getResult(), iterator); continue; // MAP 不关心结果,最简单 case MAP: String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); onFinished(subInstanceId, holder.failedNum == 0, result, iterator); continue; // MapReduce 和 BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 default: Optional lastTaskOptional = taskPersistenceService.getLastTask(instanceId, subInstanceId); if (lastTaskOptional.isPresent()) { TaskStatus lastTaskStatus = TaskStatus.of(lastTaskOptional.get().getStatus()); if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) { onFinished(subInstanceId, lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS, lastTaskOptional.get().getResult(), iterator); } } else { // 创建最终任务并提交执行 TaskDO newLastTask = new TaskDO(); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId); newLastTask.setSubInstanceId(subInstanceId); newLastTask.setAddress(workerRuntime.getWorkerAddress()); submitTask(Lists.newArrayList(newLastTask)); } } } // 舍去一切重试机制,反正超时就失败 } log.debug("[FQTaskTracker-{}] check status using {}.", instanceId, stopwatch); } private void reportStatus() { String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); if (StringUtils.isEmpty(currentServerAddress)) { return; } TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); req.setAppId(workerRuntime.getAppId()); req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setReportTime(System.currentTimeMillis()); req.setStartTime(createTime); req.setInstanceStatus(InstanceStatus.RUNNING.getV()); req.setTotalTaskNum(triggerTimes.get()); req.setSucceedTaskNum(succeedTimes.get()); req.setFailedTaskNum(failedTimes.get()); req.setSourceAddress(workerRuntime.getWorkerAddress()); // alert if (alertManager.alert()) { req.setNeedAlert(true); req.setAlertContent(alertManager.getAlertContent()); log.warn("[FQTaskTracker-{}] report alert req,time:{}", instanceId, req.getReportTime()); } // 非可靠通知,Server挂掉后任务的kill工作交由其他线程去做 TransportUtils.ttReportInstanceStatus(req, currentServerAddress, workerRuntime.getTransporter()); } /** * 处理任务完成的情况,删除内存 & 数据库数据 */ private void onFinished(Long subInstanceId, boolean success, String result, Iterator iterator) { iterator.remove(); processFinishedSubInstance(subInstanceId, success, result); } } private void processFinishedSubInstance(long subInstanceId, boolean success, String result) { long currentTime = System.currentTimeMillis(); if (success) { succeedTimes.incrementAndGet(); } else { failedTimes.incrementAndGet(); alertManager.update(currentTime, result); } // 从运行中任务列表移除 subInstanceId2TimeHolder.remove(subInstanceId); // 更新缓存数据 SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId); if (subInstanceInfo != null) { subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV(); subInstanceInfo.result = result; subInstanceInfo.finishedTime = currentTime; } // 删除数据库相关数据 taskPersistenceService.deleteAllSubInstanceTasks(instanceId, subInstanceId); // FIX_DELAY 则调度下次任务 if (timeExpressionType == TimeExpressionType.FIXED_DELAY) { scheduledPool.schedule(launcher, timeParams, TimeUnit.MILLISECONDS); } } private AlertManager constructAlertManager(ServerScheduleJobReq req) { long rate = Long.parseLong(req.getTimeExpression()); if (!StringUtils.isEmpty(req.getAlarmConfig())) { try { log.debug("[FQTaskTracker-{}] alert config:{}", instanceId, req.getAlarmConfig()); AlarmConfig alarmConfig = JsonUtils.parseObject(req.getAlarmConfig(), AlarmConfig.class); return new AlertManager(alarmConfig); } catch (Exception e) { log.warn("[FQTaskTracker-{}] constructAlertManager failed, will use default config!", instanceId); } } // 默认配置,失败一次就告警,沉默窗口 5 分钟 int statisticWindowLen = Math.max((int) (2 * rate / 1000), 1); return new AlertManager(new AlarmConfig(1, statisticWindowLen, 300)); } private class AlertManager { /** * 记录执行失败的时间 */ private final LinkedList failedRecordList = new LinkedList<>(); /** * 告警配置 */ private final AlarmConfig config; /** * 告警的激活时间 */ private long alarmActiveTime = 0L; /** * 告警的内容(记录最后一次失败的任务执行结果) */ private String content; /** * 是否处于激活状态 */ private boolean active; public AlertManager(AlarmConfig config) { log.info("[FQTaskTracker-{}] create alert manager,alertThreshold:{},statisticWindowLen:{} s,silenceWindowLen:{} s", instanceId, config.getAlertThreshold(), config.getStatisticWindowLen(), config.getSilenceWindowLen()); this.config = config; } public synchronized void update(long currentTime, String result) { log.debug("[FQTaskTracker-{}] update alert statistic info,currentTime:{}", instanceId, currentTime); if (currentTime < alarmActiveTime + config.getSilenceWindowLen() * 1000) { // 处于沉默窗口内 return; } // 当前统计窗口允许的最小值 long minTime = currentTime - config.getStatisticWindowLen() * 1000; while (!failedRecordList.isEmpty() && failedRecordList.peekFirst() < minTime) { failedRecordList.removeFirst(); } failedRecordList.add(currentTime); if (failedRecordList.size() >= config.getAlertThreshold()) { // 标记需要告警 active = true; alarmActiveTime = currentTime; content = result; } } public synchronized boolean alert() { if (active) { active = false; return true; } return false; } public String getAlertContent() { return content; } } @Data private static class SubInstanceInfo { private int status; private long startTime; private long finishedTime; private String result; } private static class SubInstanceTimeHolder { private long startTime; } }