package tech.powerjob.server.core.workflow.hanlder.impl; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import java.util.Date; /** * @author Echo009 * @since 2021/12/13 */ @Component @Slf4j @RequiredArgsConstructor public class NestedWorkflowNodeHandler implements TaskNodeHandler { private final WorkflowInfoRepository workflowInfoRepository; private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Override public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { // check Long wfId = node.getJobId(); WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); if (targetWf == null || targetWf.getStatus() == SwitchableStatus.DELETED.getV()) { if (targetWf == null) { log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId()); } else { log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow({}) has been deleted!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId()); } throw new PowerJobException("invalid nested workflow node," + node.getNodeId()); } if (node.getInstanceId() != null) { // 处理重试的情形,不需要创建实例,仅需要更改对应实例的状态,以及相应的节点状态 WorkflowInstanceInfoDO wfInstance = workflowInstanceInfoRepository.findByWfInstanceId(node.getInstanceId()).orElse(null); if (wfInstance == null) { log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({}) is not exist!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId()); throw new PowerJobException("invalid nested workflow instance id " + node.getInstanceId()); } // 不用考虑状态,只有失败的工作流嵌套节点状态会被重置 // 需要将子工作流中失败的节点状态重置为 等待 派发 try { PEWorkflowDAG nodeDag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); if (!WorkflowDAGUtils.valid(nodeDag)) { throw new PowerJobException(SystemInstanceResult.INVALID_DAG); } WorkflowDAGUtils.resetRetryableNode(nodeDag); wfInstance.setDag(JSON.toJSONString(nodeDag)); wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); wfInstance.setGmtModified(new Date()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); } catch (Exception e) { log.error("[Workflow-{}|{}] invalid nested workflow node({}),target workflow instance({})'s DAG is illegal!", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getInstanceId(),e); throw new PowerJobException("illegal nested workflow instance, id : "+ node.getInstanceId()); } } else { // 透传当前的上下文创建新的工作流实例 String wfContext = wfInstanceInfo.getWfContext(); Long instanceId = SpringUtils.getBean(WorkflowInstanceManager.class).create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId()); node.setInstanceId(instanceId); } node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis())); node.setStatus(InstanceStatus.RUNNING.getV()); } @Override public void startTaskInstance(PEWorkflowDAG.Node node) { Long wfId = node.getJobId(); WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); SpringUtils.getBean(WorkflowInstanceManager.class).start(targetWf, node.getInstanceId()); } @Override public WorkflowNodeType matchingType() { return WorkflowNodeType.NESTED_WORKFLOW; } }