package tech.powerjob.worker.actors; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.response.AskResponse; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.core.tracker.task.heavy.HeavyTaskTracker; import tech.powerjob.worker.core.tracker.task.light.LightTaskTracker; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import java.util.List; import static tech.powerjob.common.RemoteConstant.*; /** * worker 的 master 节点,处理来自 server 的 jobInstance 请求和来自 worker 的task 请求 * * @author tjq * @since 2020/3/17 */ @Slf4j @Actor(path = WTT_PATH) public class TaskTrackerActor { private final WorkerRuntime workerRuntime; public TaskTrackerActor(WorkerRuntime workerRuntime) { this.workerRuntime = workerRuntime; } /** * 子任务状态上报 处理器 */ @Handler(path = WTT_HANDLER_REPORT_TASK_STATUS) public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { int taskStatus = req.getStatus(); // 只有重量级任务才会有两级任务状态上报的机制 HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); return null; } if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); } taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); // 更新工作流上下文 taskTracker.updateAppendedWfContext(req.getAppendedWfContext()); // 结束状态需要回复接受成功 if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) { return AskResponse.succeed(null); } return null; } /** * 子任务 map 处理器 */ @Handler(path = WTT_HANDLER_MAP_TASK) public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req); return null; } boolean success = false; List subTaskList = Lists.newLinkedList(); try { req.getSubTasks().forEach(originSubTask -> { TaskDO subTask = new TaskDO(); subTask.setTaskName(req.getTaskName()); subTask.setSubInstanceId(req.getSubInstanceId()); subTask.setTaskId(originSubTask.getTaskId()); subTask.setTaskContent(originSubTask.getTaskContent()); subTaskList.add(subTask); }); success = taskTracker.submitTask(subTaskList); }catch (Exception e) { log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e); } AskResponse response = new AskResponse(); response.setSuccess(success); return response; } /** * 服务器任务调度处理器 */ @Handler(path = WTT_HANDLER_RUN_JOB) public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); Long instanceId = req.getInstanceId(); // 区分轻量级任务模型以及重量级任务模型 if (isLightweightTask(req)) { final LightTaskTracker taskTracker = LightTaskTrackerManager.getTaskTracker(instanceId); if (taskTracker != null) { log.warn("[TaskTrackerActor] LightTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId); return; } // 判断是否已经 overload if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() * LightTaskTrackerManager.OVERLOAD_FACTOR) { // ignore this request log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={}),current size = {}!",instanceId,LightTaskTrackerManager.currentTaskTrackerSize()); return; } if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum()) { log.warn("[TaskTrackerActor] this worker will be overload soon,current size = {}!",LightTaskTrackerManager.currentTaskTrackerSize()); } // 创建轻量级任务 LightTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> LightTaskTracker.create(req, workerRuntime)); } else { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(instanceId); if (taskTracker != null) { log.warn("[TaskTrackerActor] HeavyTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId); return; } // 判断是否已经 overload if (HeavyTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum()) { // ignore this request log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={})! current size = {},", instanceId, HeavyTaskTrackerManager.currentTaskTrackerSize()); return; } // 原子创建,防止多实例的存在 HeavyTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> HeavyTaskTracker.create(req, workerRuntime)); } } /** * ProcessorTracker 心跳处理器 */ @Handler(path = WTT_HANDLER_REPORT_PROCESSOR_TRACKER_STATUS) public void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req); return; } taskTracker.receiveProcessorTrackerHeartbeat(req); } /** * 停止任务实例 */ @Handler(path = WTT_HANDLER_STOP_INSTANCE) public void onReceiveServerStopInstanceReq(ServerStopInstanceReq req) { log.info("[TaskTrackerActor] receive ServerStopInstanceReq({}).", req); HeavyTaskTracker heavyTaskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (heavyTaskTracker != null) { heavyTaskTracker.stopTask(); return; } LightTaskTracker lightTaskTracker = LightTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (lightTaskTracker != null) { lightTaskTracker.stopTask(); return; } log.warn("[TaskTrackerActor] receive ServerStopInstanceReq({}) but system can't find TaskTracker.", req); } /** * 查询任务实例运行状态 */ @Handler(path = WTT_HANDLER_QUERY_INSTANCE_STATUS) public AskResponse onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) { AskResponse askResponse; TaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null && (taskTracker = LightTaskTrackerManager.getTaskTracker(req.getInstanceId())) == null) { log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req); askResponse = AskResponse.failed("can't find TaskTracker"); } else { InstanceDetail instanceDetail = taskTracker.fetchRunningStatus(req); askResponse = AskResponse.succeed(instanceDetail); } return askResponse; } private boolean isLightweightTask(ServerScheduleJobReq serverScheduleJobReq) { final ExecuteType executeType = ExecuteType.valueOf(serverScheduleJobReq.getExecuteType()); // 非单机执行的一定不是 if (executeType != ExecuteType.STANDALONE){ return false; } TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(serverScheduleJobReq.getTimeExpressionType()); // 固定频率以及固定延迟的也一定不是 return timeExpressionType != TimeExpressionType.FIXED_DELAY && timeExpressionType != TimeExpressionType.FIXED_RATE; } }