package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.response.WorkflowInstanceInfoDTO; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.lock.UseCacheLock; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; import java.util.Date; import java.util.Objects; import java.util.Optional; /** * 工作流实例服务 * * @author tjq * @author Echo009 * @since 2020/5/31 */ @Slf4j @Service @RequiredArgsConstructor public class WorkflowInstanceService { private final InstanceService instanceService; private final WorkflowInstanceInfoRepository wfInstanceInfoRepository; private final WorkflowInstanceManager workflowInstanceManager; private final WorkflowInfoRepository workflowInfoRepository; /** * 停止工作流实例(入口) * * @param wfInstanceId 工作流实例ID * @param appId 所属应用ID */ public void stopWorkflowInstanceEntrance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance already stopped"); } // 如果这是一个被嵌套的工作流,则终止父工作流 if (wfInstance.getParentWfInstanceId() != null) { SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId); return; } SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstanceId, appId); } /** * 停止工作流实例 * * @param wfInstanceId 工作流实例ID * @param appId 所属应用ID */ @DesignateServer @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024) public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance already stopped"); } // 停止所有已启动且未完成的服务 PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); // 遍历所有节点,终止正在运行的 dag.getNodes().forEach(node -> { try { if (node.getInstanceId() != null && InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); node.setStatus(InstanceStatus.STOPPED.getV()); node.setResult(SystemInstanceResult.STOPPED_BY_USER); // 特殊处理嵌套工作流节点 if (Objects.equals(node.getNodeType(), WorkflowNodeType.NESTED_WORKFLOW.getCode())) { stopWorkflowInstance(node.getInstanceId(), appId); // } else { // 注意,这里并不保证一定能终止正在运行的实例 instanceService.stopInstance(appId, node.getInstanceId()); } } } catch (Exception e) { log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); } }); // 修改数据库状态 wfInstance.setDag(JSON.toJSONString(dag)); wfInstance.setStatus(WorkflowInstanceStatus.STOPPED.getV()); wfInstance.setResult(SystemInstanceResult.STOPPED_BY_USER); wfInstance.setGmtModified(new Date()); wfInstanceInfoRepository.saveAndFlush(wfInstance); log.info("[WfInstance-{}] stop workflow instance successfully~", wfInstanceId); } /** * Add by Echo009 on 2021/02/07 * * @param wfInstanceId 工作流实例ID * @param appId 应用ID */ @DesignateServer @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024) public void retryWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); // 仅允许重试 失败的工作流 if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("workflow instance is running"); } if (wfInstance.getStatus() == WorkflowInstanceStatus.SUCCEED.getV()) { throw new PowerJobException("workflow instance is already successful"); } // 因为 DAG 非法 或者 因任务信息缺失 而失败的工作流实例无法重试 if (SystemInstanceResult.CAN_NOT_FIND_JOB.equals(wfInstance.getResult())) { throw new PowerJobException("you can't retry the workflow instance which is missing job info!"); } // 校验 DAG 信息 PEWorkflowDAG dag; try { dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); if (!WorkflowDAGUtils.valid(dag)) { throw new PowerJobException(SystemInstanceResult.INVALID_DAG); } } catch (Exception e) { throw new PowerJobException("you can't retry the workflow instance whose DAG is illegal!"); } // 检查当前工作流信息 Optional workflowInfo = workflowInfoRepository.findById(wfInstance.getWorkflowId()); if (!workflowInfo.isPresent() || workflowInfo.get().getStatus() == SwitchableStatus.DISABLE.getV()) { throw new PowerJobException("you can't retry the workflow instance whose metadata is unavailable!"); } WorkflowDAGUtils.resetRetryableNode(dag); wfInstance.setDag(JSON.toJSONString(dag)); // 更新工作流实例状态,不覆盖实际触发时间 wfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); wfInstance.setGmtModified(new Date()); wfInstanceInfoRepository.saveAndFlush(wfInstance); // 立即开始 workflowInstanceManager.start(workflowInfo.get(), wfInstanceId); } public WorkflowInstanceInfoDTO fetchWorkflowInstanceInfo(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); WorkflowInstanceInfoDTO dto = new WorkflowInstanceInfoDTO(); BeanUtils.copyProperties(wfInstance, dto); return dto; } public WorkflowInstanceInfoDO fetchWfInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = wfInstanceInfoRepository.findByWfInstanceId(wfInstanceId).orElseThrow(() -> new IllegalArgumentException("can't find workflow instance by wfInstanceId: " + wfInstanceId)); if (!Objects.equals(appId, wfInstance.getAppId())) { throw new PowerJobException("Permission Denied!"); } return wfInstance; } /** * Add by Echo009 on 2021/02/20 * 将节点标记成功 * 注意:这里仅能标记真正执行失败的且不允许跳过的节点 * 即处于 [失败且不允许跳过] 的节点 * 而且仅会操作工作流实例 DAG 中的节点信息(状态、result) * 并不会改变对应任务实例中的任何信息 *

* 还是加把锁保平安 ~ * * @param wfInstanceId 工作流实例 ID * @param nodeId 节点 ID */ @DesignateServer @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024) public void markNodeAsSuccess(Long appId, Long wfInstanceId, Long nodeId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); // 校验工作流实例状态,运行中的不允许处理 if (WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { throw new PowerJobException("you can't mark the node in a running workflow!"); } // 这里一定能反序列化成功 PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); PEWorkflowDAG.Node targetNode = null; for (PEWorkflowDAG.Node node : dag.getNodes()) { if (node.getNodeId().equals(nodeId)) { targetNode = node; break; } } if (targetNode == null) { throw new PowerJobException("can't find the node in current DAG!"); } boolean allowSkipWhenFailed = targetNode.getSkipWhenFailed() != null && targetNode.getSkipWhenFailed(); // 仅允许处理 执行失败的且不允许失败跳过的节点 if (targetNode.getInstanceId() != null && targetNode.getStatus() == InstanceStatus.FAILED.getV() // 不允许失败跳过 && !allowSkipWhenFailed) { // 仅处理工作流实例中的节点信息 targetNode.setStatus(InstanceStatus.SUCCEED.getV()) .setResult(SystemInstanceResult.MARK_AS_SUCCESSFUL_NODE); wfInstance.setDag(JSON.toJSONString(dag)); wfInstanceInfoRepository.saveAndFlush(wfInstance); return; } // 其他情况均拒绝处理 throw new PowerJobException("you can only mark the node which is failed and not allow to skip!"); } }