package tech.powerjob.worker.core.tracker.task; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.model.JobAdvancedRuntimeConfig; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.pojo.model.InstanceInfo; import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; /** * @author Echo009 * @since 2022/9/19 */ @Slf4j public abstract class TaskTracker { /** * TaskTracker创建时间 */ protected final long createTime; /** * 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份 */ protected final long instanceId; /** * 任务实例信息 */ protected final InstanceInfo instanceInfo; protected final ExecuteType executeType; protected final JobAdvancedRuntimeConfig advancedRuntimeConfig; /** * 追加的工作流上下文数据 * * @since 2021/02/05 */ protected final Map appendedWfContext; /** * worker 运行时元数据 */ protected final WorkerRuntime workerRuntime; /** * 是否结束 */ protected final AtomicBoolean finished; /** * 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down */ protected int reportFailedCnt = 0; protected static final int MAX_REPORT_FAILED_THRESHOLD = 5; protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { this.createTime = System.currentTimeMillis(); this.workerRuntime = workerRuntime; this.instanceId = req.getInstanceId(); this.instanceInfo = new InstanceInfo(); // PowerJob 值拷贝场景不多,引入三方值拷贝类库可能引入类冲突等问题,综合评估手写 ROI 最高 instanceInfo.setJobId(req.getJobId()); instanceInfo.setInstanceId(req.getInstanceId()); instanceInfo.setWfInstanceId(req.getWfInstanceId()); instanceInfo.setExecuteType(req.getExecuteType()); instanceInfo.setProcessorType(req.getProcessorType()); instanceInfo.setProcessorInfo(req.getProcessorInfo()); instanceInfo.setJobParams(req.getJobParams()); instanceInfo.setInstanceParams(req.getInstanceParams()); instanceInfo.setThreadConcurrency(req.getThreadConcurrency()); instanceInfo.setTaskRetryNum(req.getTaskRetryNum()); instanceInfo.setLogConfig(req.getLogConfig()); instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS()); instanceInfo.setAdvancedRuntimeConfig(req.getAdvancedRuntimeConfig()); // 常用变量初始化 executeType = ExecuteType.valueOf(req.getExecuteType()); advancedRuntimeConfig = Optional.ofNullable(req.getAdvancedRuntimeConfig()).map(x -> JsonUtils.parseObjectIgnoreException(x, JobAdvancedRuntimeConfig.class)).orElse(new JobAdvancedRuntimeConfig()); // 特殊处理超时时间 if (instanceInfo.getInstanceTimeoutMS() <= 0) { instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); } // 只有工作流中的任务允许向工作流中追加上下文数据 this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); this.finished = new AtomicBoolean(false); } /** * 销毁 */ public abstract void destroy(); /** * 停止任务 */ public abstract void stopTask(); /** * 查询任务实例的详细运行状态 * * @return 任务实例的详细运行状态 */ public abstract InstanceDetail fetchRunningStatus(ServerQueryInstanceStatusReq req); public static void reportCreateErrorToServer(ServerScheduleJobReq req, WorkerRuntime workerRuntime, Exception e) { log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e); // 直接发送失败请求 TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq(); response.setAppId(workerRuntime.getAppId()); response.setJobId(req.getJobId()); response.setInstanceId(req.getInstanceId()); response.setWfInstanceId(req.getWfInstanceId()); response.setInstanceStatus(InstanceStatus.FAILED.getV()); response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString())); response.setReportTime(System.currentTimeMillis()); response.setStartTime(System.currentTimeMillis()); response.setSourceAddress(workerRuntime.getWorkerAddress()); TransportUtils.ttReportInstanceStatus(response, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); } protected void reportFinalStatusThenDestroy(WorkerRuntime workerRuntime, TaskTrackerReportInstanceStatusReq reportInstanceStatusReq) { String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); // 最终状态需要可靠上报 boolean serverAccepted = false; try { serverAccepted = TransportUtils.reliableTtReportInstanceStatus(reportInstanceStatusReq, currentServerAddress, workerRuntime.getTransporter()); } catch (Exception e) { log.warn("[TaskTracker-{}] report finished status failed, req={}.", instanceId, reportInstanceStatusReq, e); } if (!serverAccepted) { if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) { log.error("[TaskTracker-{}] try to report finished status(detail={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, reportInstanceStatusReq); destroy(); } return; } log.info("[TaskTracker-{}] report finished status(detail={}) success", instanceId, reportInstanceStatusReq); destroy(); } }