package tech.powerjob.server.core.handler; import lombok.extern.slf4j.Slf4j; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.request.WorkerHeartbeat; import tech.powerjob.common.request.WorkerLogReportReq; import tech.powerjob.common.response.AskResponse; import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.server.core.instance.InstanceLogService; import tech.powerjob.server.core.instance.InstanceManager; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import tech.powerjob.server.monitor.MonitorService; import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; import tech.powerjob.server.remote.worker.WorkerClusterManagerService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; /** * receive and process worker's request * * @author tjq * @since 2022/9/11 */ @Slf4j @Component @Actor(path = RemoteConstant.S4W_PATH) public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler { private final InstanceManager instanceManager; private final WorkflowInstanceManager workflowInstanceManager; private final InstanceLogService instanceLogService; public WorkerRequestHandlerImpl(InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, InstanceLogService instanceLogService, MonitorService monitorService, Environment environment, ContainerInfoRepository containerInfoRepository, WorkerClusterQueryService workerClusterQueryService) { super(monitorService, environment, containerInfoRepository, workerClusterQueryService); this.instanceManager = instanceManager; this.workflowInstanceManager = workflowInstanceManager; this.instanceLogService = instanceLogService; } @Override protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) { WorkerClusterManagerService.updateStatus(heartbeat); } @Override protected AskResponse processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception { // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { // 更新工作流上下文信息 workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); } instanceManager.updateStatus(req); // 结束状态(成功/失败)需要回复消息 if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { return AskResponse.succeed(null); } return null; } @Override protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) { // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); } }