package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.WorkflowContextConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.core.alarm.AlarmUtils; import tech.powerjob.server.core.helper.StatusMappingHelper; import tech.powerjob.server.core.lock.UseCacheLock; import tech.powerjob.server.core.service.UserService; import tech.powerjob.server.core.service.WorkflowNodeHandleService; import tech.powerjob.server.core.uid.IdGenerateService; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.alarm.AlarmCenter; import tech.powerjob.server.core.alarm.module.WorkflowInstanceAlarm; import tech.powerjob.server.persistence.remote.model.*; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import java.util.*; import java.util.stream.Collectors; import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNotAllowSkipWhenFailed; /** * 管理运行中的工作流实例 * * @author tjq * @author Echo009 * @since 2020/5/26 */ @Slf4j @Service @RequiredArgsConstructor @SuppressWarnings("squid:S1192") public class WorkflowInstanceManager { private final AlarmCenter alarmCenter; private final IdGenerateService idGenerateService; private final JobInfoRepository jobInfoRepository; private final UserService userService; private final WorkflowInfoRepository workflowInfoRepository; private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; private final WorkflowNodeInfoRepository workflowNodeInfoRepository; private final WorkflowNodeHandleService workflowNodeHandleService; /** * 创建工作流任务实例 * ******************************************** * 2021-02-03 modify by Echo009 * 通过 initParams 初始化工作流上下文(wfContext) * ******************************************** * * @param wfInfo 工作流任务元数据(描述信息) * @param initParams 启动参数 * @param expectTriggerTime 预计执行时间 * @return wfInstanceId */ public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long parentWfInstanceId) { Long wfId = wfInfo.getId(); Long wfInstanceId = idGenerateService.allocate(); // 构造实例信息 WorkflowInstanceInfoDO newWfInstance = constructWfInstance(wfInfo, initParams, expectTriggerTime, wfId, wfInstanceId); if (parentWfInstanceId != null) { // 处理子工作流 newWfInstance.setParentWfInstanceId(parentWfInstanceId); // 直接透传上下文 newWfInstance.setWfContext(initParams); } PEWorkflowDAG dag = null; try { dag = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); // 校验 DAG 信息 if (!WorkflowDAGUtils.valid(dag)) { log.error("[Workflow-{}|{}] DAG of this workflow is illegal! maybe you has modified the DAG info directly in database!", wfId, wfInstanceId); throw new PowerJobException(SystemInstanceResult.INVALID_DAG); } // 初始化节点信息 initNodeInfo(dag); // 最后检查工作流中的任务是否均处于可用状态(没有被删除) Set allJobIds = Sets.newHashSet(); dag.getNodes().forEach(node -> { if (node.getNodeType() == WorkflowNodeType.JOB.getCode()) { allJobIds.add(node.getJobId()); } // 将节点的初始状态置为等待派发 node.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); }); int needNum = allJobIds.size(); long dbNum = jobInfoRepository.countByAppIdAndStatusInAndIdIn(wfInfo.getAppId(), Sets.newHashSet(SwitchableStatus.ENABLE.getV(), SwitchableStatus.DISABLE.getV()), allJobIds); log.debug("[Workflow-{}|{}] contains {} jobs, find {} jobs in database.", wfId, wfInstanceId, needNum, dbNum); if (dbNum < allJobIds.size()) { log.warn("[Workflow-{}|{}] this workflow need {} jobs, but just find {} jobs in database, maybe you delete or disable some job!", wfId, wfInstanceId, needNum, dbNum); throw new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB); } newWfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(newWfInstance); } catch (Exception e) { if (dag != null) { newWfInstance.setDag(JSON.toJSONString(dag)); } handleWfInstanceFinalStatus(newWfInstance, e.getMessage(), WorkflowInstanceStatus.FAILED); } return wfInstanceId; } /** * 初始化节点信息 */ private void initNodeInfo(PEWorkflowDAG dag) { for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO workflowNodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_NODE)); if (workflowNodeInfo.getType() == null) { // 前向兼容 workflowNodeInfo.setType(WorkflowNodeType.JOB.getCode()); } // 填充基础信息 node.setNodeType(workflowNodeInfo.getType()) .setJobId(workflowNodeInfo.getJobId()) .setNodeName(workflowNodeInfo.getNodeName()) .setNodeParams(workflowNodeInfo.getNodeParams()) .setEnable(workflowNodeInfo.getEnable()) .setSkipWhenFailed(workflowNodeInfo.getSkipWhenFailed()); // 任务节点,初始化节点参数时需要特殊处理 if (node.getNodeType() == WorkflowNodeType.JOB.getCode()) { // 任务节点缺失任务信息 if (workflowNodeInfo.getJobId() == null) { throw new PowerJobException(SystemInstanceResult.ILLEGAL_NODE); } JobInfoDO jobInfo = jobInfoRepository.findById(workflowNodeInfo.getJobId()).orElseThrow(() -> new PowerJobException(SystemInstanceResult.CAN_NOT_FIND_JOB)); if (!StringUtils.isBlank(workflowNodeInfo.getNodeParams())) { node.setNodeParams(workflowNodeInfo.getNodeParams()); } else { node.setNodeParams(jobInfo.getJobParams()); } } } } /** * 构造工作流实例,并初始化基础信息(不包括 DAG ) */ private WorkflowInstanceInfoDO constructWfInstance(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime, Long wfId, Long wfInstanceId) { Date now = new Date(); WorkflowInstanceInfoDO newWfInstance = new WorkflowInstanceInfoDO(); newWfInstance.setAppId(wfInfo.getAppId()); newWfInstance.setWfInstanceId(wfInstanceId); newWfInstance.setWorkflowId(wfId); newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV()); newWfInstance.setExpectedTriggerTime(expectTriggerTime); newWfInstance.setActualTriggerTime(System.currentTimeMillis()); newWfInstance.setWfInitParams(initParams); // 如果 initParams 是个合法的 Map JSON 串则直接将其注入 wfContext boolean injectDirect = false; try { Map parseRes = JSON.parseObject(initParams, new TypeReference>() { }); if (parseRes != null && !parseRes.isEmpty()) { injectDirect = true; } } catch (Exception e) { // ignore } if (injectDirect) { newWfInstance.setWfContext(initParams); } else { // 初始化上下文 Map wfContextMap = Maps.newHashMap(); wfContextMap.put(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, initParams); newWfInstance.setWfContext(JsonUtils.toJSONString(wfContextMap)); } newWfInstance.setGmtCreate(now); newWfInstance.setGmtModified(now); return newWfInstance; } /** * 开始任务 * ******************************************** * 2021-02-03 modify by Echo009 * 1、工作流支持配置重复的任务节点 * 2、移除参数 initParams,改为统一从工作流实例中获取 * 传递工作流实例的 wfContext 作为 初始启动参数 * 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 兼容原地重试逻辑 * ******************************************** * * @param wfInfo 工作流任务信息 * @param wfInstanceId 工作流任务实例ID */ @UseCacheLock(type = "processWfInstance", key = "#wfInfo.getMaxWfInstanceNum() > 0 ? #wfInfo.getId() : #wfInstanceId", concurrencyLevel = 1024) public void start(WorkflowInfoDO wfInfo, Long wfInstanceId) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); return; } WorkflowInstanceInfoDO wfInstanceInfo = wfInstanceInfoOpt.get(); // 不是等待中,不再继续执行(可能上一流程已经失败) if (wfInstanceInfo.getStatus() != WorkflowInstanceStatus.WAITING.getV()) { log.info("[Workflow-{}|{}] workflowInstance({}) needn't running any more.", wfInfo.getId(), wfInstanceId, wfInstanceInfo); return; } // 最大实例数量 <= 0 表示不限制 if (wfInfo.getMaxWfInstanceNum() > 0) { // 并发度控制 int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS); if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) { handleWfInstanceFinalStatus(wfInstanceInfo, String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), WorkflowInstanceStatus.FAILED); return; } } try { // 从实例中读取工作流信息 PEWorkflowDAG dag = JSON.parseObject(wfInstanceInfo.getDag(), PEWorkflowDAG.class); // 根节点有可能被 disable List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); // 先处理其中的控制节点 List controlNodes = findControlNodes(readyNodes); while (!controlNodes.isEmpty()) { workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstanceInfo); readyNodes = WorkflowDAGUtils.listReadyNodes(dag); controlNodes = findControlNodes(readyNodes); } if (readyNodes.isEmpty()) { // 没有就绪的节点(所有节点都被禁用) wfInstanceInfo.setFinishedTime(System.currentTimeMillis()); wfInstanceInfo.setDag(JSON.toJSONString(dag)); log.warn("[Workflow-{}|{}] workflowInstance({}) needn't running ", wfInfo.getId(), wfInstanceId, wfInstanceInfo); handleWfInstanceFinalStatus(wfInstanceInfo, SystemInstanceResult.NO_ENABLED_NODES, WorkflowInstanceStatus.SUCCEED); return; } // 需要更新工作流实例状态 wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV()); // 处理任务节点 workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstanceInfo); log.info("[Workflow-{}|{}] start workflow successfully", wfInfo.getId(), wfInstanceId); } catch (Exception e) { log.error("[Workflow-{}|{}] start workflow: {} failed.", wfInfo.getId(), wfInstanceId, wfInfo, e); handleWfInstanceFinalStatus(wfInstanceInfo, e.getMessage(), WorkflowInstanceStatus.FAILED); } } /** * 下一步(当工作流的某个任务完成时调用该方法) * ******************************************** * 2021-02-03 modify by Echo009 * 1、工作流支持配置重复的任务节点 * 2、不再获取上游任务的结果作为实例参数而是传递工作流 * 实例的 wfContext 作为 实例参数 * 3、通过 {@link WorkflowDAGUtils#listReadyNodes} 支持跳过禁用的节点 * ******************************************** * * @param wfInstanceId 工作流任务实例ID * @param instanceId 具体完成任务的某个任务实例ID * @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED) * @param result 完成任务的任务实例结果 */ @SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"}) @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024) public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); return; } WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); Long wfId = wfInstance.getWorkflowId(); // 特殊处理手动终止 且 工作流实例已经不在运行状态的情况 if (status == InstanceStatus.STOPPED && !WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { // 由用户手动停止工作流实例导致,不需要任何操作 return; } try { PEWorkflowDAG dag = JSON.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); // 更新完成节点状态 boolean allFinished = true; PEWorkflowDAG.Node instanceNode = null; for (PEWorkflowDAG.Node node : dag.getNodes()) { if (instanceId.equals(node.getInstanceId())) { node.setStatus(status.getV()); node.setResult(result); node.setFinishedTime(CommonUtils.formatTime(System.currentTimeMillis())); instanceNode = node; log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getNodeId(), node.getJobId(), instanceId, status.name(), result); } if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { allFinished = false; } } if (instanceNode == null) { // DAG 中的节点实例已经被覆盖(原地重试,生成了新的实例信息),直接忽略 log.warn("[Workflow-{}|{}] current job instance(instanceId={}) is dissociative! it will be ignore! ", wfId, wfInstanceId, instanceId); return; } wfInstance.setGmtModified(new Date()); wfInstance.setDag(JSON.toJSONString(dag)); // 工作流已经结束(某个节点失败导致工作流整体已经失败),仅更新最新的 DAG 图 if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { workflowInstanceInfoRepository.saveAndFlush(wfInstance); log.info("[Workflow-{}|{}] workflow already finished(status={}), just update the dag info.", wfId, wfInstanceId, wfInstance.getStatus()); return; } // 任务失败 && 不允许失败跳过,DAG 流程被打断,整体失败 if (status == InstanceStatus.FAILED && isNotAllowSkipWhenFailed(instanceNode)) { log.warn("[Workflow-{}|{}] workflow instance process failed because middle task(instanceId={}) failed", wfId, wfInstanceId, instanceId); handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_FAILED, WorkflowInstanceStatus.FAILED); return; } // 子任务被手动停止 if (status == InstanceStatus.STOPPED) { handleWfInstanceFinalStatus(wfInstance, SystemInstanceResult.MIDDLE_JOB_STOPPED, WorkflowInstanceStatus.STOPPED); log.warn("[Workflow-{}|{}] workflow instance stopped because middle task(instanceId={}) stopped by user", wfId, wfInstanceId, instanceId); return; } // 注意:这里会直接跳过 disable 的节点 List readyNodes = WorkflowDAGUtils.listReadyNodes(dag); // 如果没有就绪的节点,需要再次判断是否已经全部完成 if (readyNodes.isEmpty() && isFinish(dag)) { allFinished = true; } // 工作流执行完毕(能执行到这里代表该工作流内所有子任务都执行成功了) if (allFinished) { // 这里得重新更新一下,因为 WorkflowDAGUtils#listReadyNodes 可能会更新节点状态 wfInstance.setDag(JSON.toJSONString(dag)); // 最终任务的结果作为整个 workflow 的结果 handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); return; } // 先处理其中的控制节点 List controlNodes = findControlNodes(readyNodes); while (!controlNodes.isEmpty()) { workflowNodeHandleService.handleControlNodes(controlNodes, dag, wfInstance); readyNodes = WorkflowDAGUtils.listReadyNodes(dag); controlNodes = findControlNodes(readyNodes); } // 再次判断是否已完成 (允许控制节点出现在末尾) if (readyNodes.isEmpty()) { if (isFinish(dag)) { wfInstance.setDag(JSON.toJSONString(dag)); handleWfInstanceFinalStatus(wfInstance, result, WorkflowInstanceStatus.SUCCEED); log.info("[Workflow-{}|{}] process successfully.", wfId, wfInstanceId); return; } // 没有就绪的节点 但 还没执行完成,仅更新 DAG wfInstance.setDag(JSON.toJSONString(dag)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); return; } // 处理任务节点 workflowNodeHandleService.handleTaskNodes(readyNodes, dag, wfInstance); } catch (Exception e) { handleWfInstanceFinalStatus(wfInstance, "MOVE NEXT STEP FAILED: " + e.getMessage(), WorkflowInstanceStatus.FAILED); log.error("[Workflow-{}|{}] update failed.", wfId, wfInstanceId, e); } } /** * 更新工作流上下文 * fix : 得和其他操作工作流实例的方法用同一把锁才行,不然有并发问题,会导致节点状态被覆盖 * * @param wfInstanceId 工作流实例 * @param appendedWfContextData 追加的上下文数据 * @since 2021/02/05 */ @UseCacheLock(type = "processWfInstance", key = "#wfInstanceId", concurrencyLevel = 1024) public void updateWorkflowContext(Long wfInstanceId, Map appendedWfContextData) { try { Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); if (!wfInstanceInfoOpt.isPresent()) { log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); return; } WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); HashMap wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference>() { }); for (Map.Entry entry : appendedWfContextData.entrySet()) { String key = entry.getKey(); String originValue = wfContext.put(key, entry.getValue()); log.info("[Workflow-{}|{}] update workflow context {} : {} -> {}", wfInstance.getWorkflowId(), wfInstance.getWfInstanceId(), key, originValue, entry.getValue()); } wfInstance.setWfContext(JSON.toJSONString(wfContext)); workflowInstanceInfoRepository.saveAndFlush(wfInstance); } catch (Exception e) { log.error("[WorkflowInstanceManager] update workflow(workflowInstanceId={}) context failed.", wfInstanceId, e); } } private void handleWfInstanceFinalStatus(WorkflowInstanceInfoDO wfInstance, String result, WorkflowInstanceStatus workflowInstanceStatus) { wfInstance.setStatus(workflowInstanceStatus.getV()); wfInstance.setResult(result); wfInstance.setFinishedTime(System.currentTimeMillis()); wfInstance.setGmtModified(new Date()); workflowInstanceInfoRepository.saveAndFlush(wfInstance); // 处理子工作流 if (wfInstance.getParentWfInstanceId() != null) { // 先处理上下文 if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED){ HashMap wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference>() { }); SpringUtils.getBean(this.getClass()).updateWorkflowContext(wfInstance.getParentWfInstanceId(), wfContext); } // 处理父工作流, fix https://github.com/PowerJob/PowerJob/issues/465 SpringUtils.getBean(this.getClass()).move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result); } // 报警 if (workflowInstanceStatus == WorkflowInstanceStatus.FAILED) { try { workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> { WorkflowInstanceAlarm content = new WorkflowInstanceAlarm(); BeanUtils.copyProperties(wfInfo, content); BeanUtils.copyProperties(wfInstance, content); content.setResult(result); List userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds()); alarmCenter.alarmFailed(content, AlarmUtils.convertUserInfoList2AlarmTargetList(userList)); }); } catch (Exception ignore) { // ignore } } } private List findControlNodes(List readyNodes) { return readyNodes.stream().filter(node -> { WorkflowNodeType nodeType = WorkflowNodeType.of(node.getNodeType()); return nodeType.isControlNode(); }).collect(Collectors.toList()); } private boolean isFinish(PEWorkflowDAG dag) { for (PEWorkflowDAG.Node node : dag.getNodes()) { if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(node.getStatus())) { return false; } } return true; } }