package tech.powerjob.worker.core.tracker.task.light; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.enhance.SafeRunnableWrapper; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.*; import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.extension.processor.ProcessorDefinition; import tech.powerjob.worker.log.OmsLoggerFactory; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * @author Echo009 * @since 2022/9/19 */ @Slf4j public class LightTaskTracker extends TaskTracker { /** * statusReportScheduledFuture */ private final ScheduledFuture statusReportScheduledFuture; /** * timeoutCheckScheduledFuture */ private final ScheduledFuture timeoutCheckScheduledFuture; /** * processFuture */ private final Future processFuture; /** * 执行线程 */ private final AtomicReference executeThread; /** * 处理器信息 */ private final ProcessorBean processorBean; /** * 上下文 */ private final TaskContext taskContext; /** * 任务状态 */ private TaskStatus status; /** * 任务开始执行的时间 */ private Long taskStartTime; /** * 任务执行结束的时间 或者 任务被 kill 掉的时间 */ private Long taskEndTime; /** * 任务处理结果 */ private ProcessResult result; private final AtomicBoolean timeoutFlag = new AtomicBoolean(false); protected final AtomicBoolean stopFlag = new AtomicBoolean(false); protected final AtomicBoolean destroyFlag = new AtomicBoolean(false); public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { super(req, workerRuntime); try { taskContext = constructTaskContext(req, workerRuntime); // 等待处理 status = TaskStatus.WORKER_RECEIVED; // 加载 Processor processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo())); executeThread = new AtomicReference<>(); long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L; // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段 long initDelay = RandomUtils.nextInt(5000, 10000); // 上报任务状态 statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new SafeRunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS); // 超时控制 if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { if (instanceInfo.getInstanceTimeoutMS() < 1000L) { timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); } else { // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); } } else { timeoutCheckScheduledFuture = null; } // 提交任务到线程池 processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask); } catch (Exception e) { log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req); destroy(); throw e; } } /** * 静态方法创建 TaskTracker * * @param req 服务端调度任务请求 * @return LightTaskTracker */ public static LightTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { try { return new LightTaskTracker(req, workerRuntime); } catch (Exception e) { reportCreateErrorToServer(req, workerRuntime, e); } return null; } @Override public void destroy() { if (!destroyFlag.compareAndSet(false, true)) { log.warn("[TaskTracker-{}] This TaskTracker has been destroyed!", instanceId); return; } if (statusReportScheduledFuture != null) { statusReportScheduledFuture.cancel(true); } if (timeoutCheckScheduledFuture != null) { timeoutCheckScheduledFuture.cancel(true); } if (processFuture != null) { processFuture.cancel(true); } LightTaskTrackerManager.removeTaskTracker(instanceId); // 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间) log.info("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); } @Override public void stopTask() { // 已经执行完成,忽略 if (finished.get()) { log.warn("[TaskTracker-{}] fail to stop task,task is finished!result:{}", instanceId, result); return; } if (!stopFlag.compareAndSet(false, true)) { log.warn("[TaskTracker-{}] task has been mark as stopped,ignore this request!", instanceId); return; } // 当前任务尚未执行 if (status == TaskStatus.WORKER_RECEIVED) { log.warn("[TaskTracker-{}] task is not started,destroy this taskTracker directly!", instanceId); destroy(); return; } // 正在执行 if (processFuture != null) { // 尝试打断 log.info("[TaskTracker-{}] try to interrupt task!", instanceId); processFuture.cancel(true); } } @Override public InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req) { InstanceDetail detail = new InstanceDetail(); // 填充基础信息 detail.setActualTriggerTime(createTime); detail.setStatus(InstanceStatus.RUNNING.getV()); detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress()); // 填充详细信息 InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail(); taskDetail.setSucceedTaskNum(0); taskDetail.setFailedTaskNum(0); taskDetail.setTotalTaskNum(1); detail.setTaskDetail(taskDetail); return detail; } private ProcessResult processTask() { executeThread.set(Thread.currentThread()); // 设置任务开始执行的时间 taskStartTime = System.currentTimeMillis(); status = TaskStatus.WORKER_PROCESSING; // 开始执行时,提交任务判断是否超时 ProcessResult res = null; do { Thread.currentThread().setContextClassLoader(processorBean.getClassLoader()); if (res != null && !res.isSuccess()) { // 重试 taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1); log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes()); } try { res = processorBean.getProcessor().process(taskContext); } catch (InterruptedException e) { log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e); Thread.currentThread().interrupt(); if (timeoutFlag.get()) { res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED); } else if (stopFlag.get()) { res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED); } else { res = new ProcessResult(false, e.toString()); } } catch (Exception e) { log.warn("[TaskTracker-{}] process failed !", instanceId, e); res = new ProcessResult(false, e.toString()); } if (res == null) { log.warn("[TaskTracker-{}] processor return null !", instanceId); res = new ProcessResult(false, "Processor return null"); } } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get()); executeThread.set(null); taskEndTime = System.currentTimeMillis(); finished.set(true); result = res; status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; // 取消超时检查任务 if (timeoutCheckScheduledFuture != null) { timeoutCheckScheduledFuture.cancel(true); } log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result); // 执行完成后立即上报一次 checkAndReportStatus(); return result; } private synchronized void checkAndReportStatus() { if (destroyFlag.get()) { // 已经被销毁,不需要上报状态 log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status); return; } TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq(); reportInstanceStatusReq.setAppId(workerRuntime.getAppId()); reportInstanceStatusReq.setJobId(instanceInfo.getJobId()); reportInstanceStatusReq.setInstanceId(instanceId); reportInstanceStatusReq.setWfInstanceId(instanceInfo.getWfInstanceId()); reportInstanceStatusReq.setTotalTaskNum(1); reportInstanceStatusReq.setReportTime(System.currentTimeMillis()); reportInstanceStatusReq.setStartTime(createTime); reportInstanceStatusReq.setSourceAddress(workerRuntime.getWorkerAddress()); reportInstanceStatusReq.setSucceedTaskNum(0); reportInstanceStatusReq.setFailedTaskNum(0); if (stopFlag.get()) { if (finished.get()) { // 已经被成功打断 destroy(); return; } final Thread workerThread = executeThread.get(); if (!finished.get() && workerThread != null) { // 未能成功打断任务,强制停止 try { if (tryForceStopThread(workerThread)) { finished.set(true); taskEndTime = System.currentTimeMillis(); result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP); log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); // 被终止的任务不需要上报状态 destroy(); return; } } catch (Exception e) { log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e); } } } if (finished.get()) { if (result.isSuccess()) { reportInstanceStatusReq.setSucceedTaskNum(1); reportInstanceStatusReq.setInstanceStatus(InstanceStatus.SUCCEED.getV()); } else { reportInstanceStatusReq.setFailedTaskNum(1); reportInstanceStatusReq.setInstanceStatus(InstanceStatus.FAILED.getV()); } // 处理工作流上下文 if (taskContext.getWorkflowContext().getWfInstanceId() != null) { reportInstanceStatusReq.setAppendedWfContext(taskContext.getWorkflowContext().getAppendedContextData()); } reportInstanceStatusReq.setResult(suit(result.getMsg())); reportInstanceStatusReq.setEndTime(taskEndTime); // 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态 reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1); reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq); return; } // 未完成的任务,只需要上报状态 reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV()); log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status); TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); } private void timeoutCheck() { if (taskStartTime == null || System.currentTimeMillis() - taskStartTime < instanceInfo.getInstanceTimeoutMS()) { return; } if (finished.get() && result != null) { timeoutCheckScheduledFuture.cancel(true); return; } // 首次判断超时 if (timeoutFlag.compareAndSet(false, true)) { // 超时,仅尝试打断任务 log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS()); processFuture.cancel(true); return; } if (finished.get()) { // 已经成功被打断 log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime); return; } Thread workerThread = executeThread.get(); if (workerThread == null) { return; } // 未能成功打断任务,强制终止 try { if (tryForceStopThread(workerThread)) { finished.set(true); taskEndTime = System.currentTimeMillis(); result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP); log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName()); } } catch (Exception e) { log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e); } } private TaskContext constructTaskContext(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { final TaskContext context = new TaskContext(); context.setTaskId(req.getJobId() + "#" + req.getInstanceId()); context.setJobId(req.getJobId()); context.setJobParams(req.getJobParams()); context.setInstanceId(req.getInstanceId()); context.setInstanceParams(req.getInstanceParams()); context.setWorkflowContext(new WorkflowContext(req.getWfInstanceId(), req.getInstanceParams())); context.setOmsLogger(OmsLoggerFactory.build(req.getInstanceId(), req.getLogConfig(), workerRuntime)); context.setTaskName(TaskConstant.ROOT_TASK_NAME); context.setMaxRetryTimes(req.getTaskRetryNum()); context.setCurrentRetryTimes(0); context.setUserContext(workerRuntime.getWorkerConfig().getUserContext()); // 轻量级任务不会涉及到任务分片的处理,不需要处理子任务相关的信息 return context; } private String suit(String result) { if (StringUtils.isEmpty(result)) { return ""; } final int maxLength = workerRuntime.getWorkerConfig().getMaxResultLength(); if (result.length() <= maxLength) { return result; } log.warn("[TaskTracker-{}] task's result is too large({}>{}), a part will be discarded.", instanceId, result.length(), maxLength); return result.substring(0, maxLength).concat("..."); } /** * try force stop thread * * @param thread thread * @return stop result */ private boolean tryForceStopThread(Thread thread) { String threadName = thread.getName(); String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD); if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) { log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId); return false; } log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName); try { thread.stop(); return true; } catch (Throwable t) { log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage()); } return false; } }