package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; import tech.powerjob.server.common.SJ; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import tech.powerjob.server.core.scheduler.TimingStrategyService; import tech.powerjob.server.core.service.NodeValidateService; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; import javax.annotation.Resource; import javax.transaction.Transactional; import java.util.*; /** * Workflow 服务 * * @author tjq * @author zenggonggu * @author Echo009 * @since 2020/5/26 */ @Slf4j @Service public class WorkflowService { @Resource private WorkflowInstanceManager workflowInstanceManager; @Resource private WorkflowInfoRepository workflowInfoRepository; @Resource private WorkflowNodeInfoRepository workflowNodeInfoRepository; @Resource private NodeValidateService nodeValidateService; @Resource private TimingStrategyService timingStrategyService; /** * 保存/修改工作流信息 *

* 注意这里不会保存 DAG 信息 * * @param req 请求 * @return 工作流ID */ @Transactional(rollbackOn = Exception.class) public Long saveWorkflow(SaveWorkflowRequest req) { req.valid(); Long wfId = req.getId(); WorkflowInfoDO wf; if (wfId == null) { wf = new WorkflowInfoDO(); wf.setGmtCreate(new Date()); } else { Long finalWfId = wfId; wf = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id:" + finalWfId)); } BeanUtils.copyProperties(req, wf); wf.setGmtModified(new Date()); wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV()); wf.setTimeExpressionType(req.getTimeExpressionType().getV()); if (req.getNotifyUserIds() != null) { wf.setNotifyUserIds(SJ.COMMA_JOINER.join(req.getNotifyUserIds())); } if (req.getLifeCycle() != null) { wf.setLifecycle(JSON.toJSONString(req.getLifeCycle())); } if (TimeExpressionType.FREQUENT_TYPES.contains(req.getTimeExpressionType().getV())) { // 固定频率类型的任务不计算 wf.setTimeExpression(null); } else { LifeCycle lifeCycle = Optional.ofNullable(req.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE); Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(wf.getTimeExpressionType()), wf.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd()); wf.setNextTriggerTime(nextValidTime); } // 新增工作流,需要先 save 一下获取 ID if (wfId == null) { wf = workflowInfoRepository.saveAndFlush(wf); wfId = wf.getId(); } wf.setPeDAG(validateAndConvert2String(wfId, req.getDag())); workflowInfoRepository.saveAndFlush(wf); return wfId; } /** * 保存 DAG 信息 * 这里会物理删除游离的节点信息 */ private String validateAndConvert2String(Long wfId, PEWorkflowDAG dag) { if (dag == null || !WorkflowDAGUtils.valid(dag)) { throw new PowerJobException("illegal DAG"); } // 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias) // 其中 jobId,jobName 均以节点中的信息为准 List nodeIdList = Lists.newArrayList(); List newNodes = Lists.newArrayList(); WorkflowDAG complexDag = WorkflowDAGUtils.convert(dag); for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId())); // 更新工作流 ID if (nodeInfo.getWorkflowId() == null) { nodeInfo.setWorkflowId(wfId); nodeInfo.setGmtModified(new Date()); workflowNodeInfoRepository.saveAndFlush(nodeInfo); } if (!wfId.equals(nodeInfo.getWorkflowId())) { throw new PowerJobException("can't use another workflow's node"); } nodeValidateService.complexValidate(nodeInfo, complexDag); // 只保存节点的 ID 信息,清空其他信息 newNodes.add(new PEWorkflowDAG.Node(node.getNodeId())); nodeIdList.add(node.getNodeId()); } dag.setNodes(newNodes); int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(wfId, nodeIdList); log.warn("[WorkflowService-{}] delete {} dissociative nodes of workflow", wfId, deleteCount); return JSON.toJSONString(dag); } /** * 深度复制工作流 * * @param wfId 工作流 ID * @param appId APP ID * @return 生成的工作流 ID */ @Transactional(rollbackOn = Exception.class) public long copyWorkflow(Long wfId, Long appId) { WorkflowInfoDO originWorkflow = permissionCheck(wfId, appId); if (originWorkflow.getStatus() == SwitchableStatus.DELETED.getV()) { throw new IllegalStateException("can't copy the workflow which has been deleted!"); } // 拷贝基础信息 WorkflowInfoDO copyWorkflow = new WorkflowInfoDO(); BeanUtils.copyProperties(originWorkflow, copyWorkflow); copyWorkflow.setId(null); copyWorkflow.setGmtCreate(new Date()); copyWorkflow.setGmtModified(new Date()); copyWorkflow.setWfName(copyWorkflow.getWfName() + "_COPY"); // 先 save 获取 id copyWorkflow = workflowInfoRepository.saveAndFlush(copyWorkflow); if (StringUtils.isEmpty(copyWorkflow.getPeDAG())) { return copyWorkflow.getId(); } PEWorkflowDAG dag = JSON.parseObject(copyWorkflow.getPeDAG(), PEWorkflowDAG.class); // 拷贝节点信息,并且更新 DAG 中的节点信息 if (!CollectionUtils.isEmpty(dag.getNodes())) { // originNodeId => copyNodeId HashMap nodeIdMap = new HashMap<>(dag.getNodes().size(), 1); // 校正 节点信息 for (PEWorkflowDAG.Node node : dag.getNodes()) { WorkflowNodeInfoDO originNode = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + node.getNodeId())); WorkflowNodeInfoDO copyNode = new WorkflowNodeInfoDO(); BeanUtils.copyProperties(originNode, copyNode); copyNode.setId(null); copyNode.setWorkflowId(copyWorkflow.getId()); copyNode.setGmtCreate(new Date()); copyNode.setGmtModified(new Date()); copyNode = workflowNodeInfoRepository.saveAndFlush(copyNode); nodeIdMap.put(originNode.getId(), copyNode.getId()); node.setNodeId(copyNode.getId()); } // 校正 边信息 for (PEWorkflowDAG.Edge edge : dag.getEdges()) { edge.setFrom(nodeIdMap.get(edge.getFrom())); edge.setTo(nodeIdMap.get(edge.getTo())); } } copyWorkflow.setPeDAG(JSON.toJSONString(dag)); workflowInfoRepository.saveAndFlush(copyWorkflow); return copyWorkflow.getId(); } /** * 获取工作流元信息,这里获取到的 DAG 包含节点的完整信息(是否启用、是否允许失败跳过) * * @param wfId 工作流ID * @param appId 应用ID * @return 对外输出对象 */ public WorkflowInfoDO fetchWorkflow(Long wfId, Long appId) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); fillWorkflow(wfInfo); return wfInfo; } /** * 删除工作流(软删除) * * @param wfId 工作流ID * @param appId 所属应用ID */ public void deleteWorkflow(Long wfId, Long appId) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); wfInfo.setStatus(SwitchableStatus.DELETED.getV()); wfInfo.setGmtModified(new Date()); workflowInfoRepository.saveAndFlush(wfInfo); } /** * 禁用工作流 * * @param wfId 工作流ID * @param appId 所属应用ID */ public void disableWorkflow(Long wfId, Long appId) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); wfInfo.setStatus(SwitchableStatus.DISABLE.getV()); wfInfo.setGmtModified(new Date()); workflowInfoRepository.saveAndFlush(wfInfo); } /** * 启用工作流 * * @param wfId 工作流ID * @param appId 所属应用ID */ public void enableWorkflow(Long wfId, Long appId) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); wfInfo.setStatus(SwitchableStatus.ENABLE.getV()); wfInfo.setGmtModified(new Date()); workflowInfoRepository.saveAndFlush(wfInfo); } /** * 立即运行工作流 * * @param wfId 工作流ID * @param appId 所属应用ID * @param initParams 启动参数 * @param delay 延迟时间 * @return 该 workflow 实例的 instanceId(wfInstanceId) */ @DesignateServer public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) { delay = delay == null ? 0 : delay; WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay, null); if (delay <= 0) { workflowInstanceManager.start(wfInfo, wfInstanceId); } else { InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId)); } return wfInstanceId; } /** * 保存工作流节点(新增 或者 保存) * * @param workflowNodeRequestList 工作流节点 * @return 更新 或者 创建后的工作流节点信息 */ @Transactional(rollbackOn = Exception.class) public List saveWorkflowNode(List workflowNodeRequestList) { if (CollectionUtils.isEmpty(workflowNodeRequestList)) { return Collections.emptyList(); } final Long appId = workflowNodeRequestList.get(0).getAppId(); List res = new ArrayList<>(workflowNodeRequestList.size()); for (SaveWorkflowNodeRequest req : workflowNodeRequestList) { req.valid(); // 必须位于同一个 APP 下 if (!appId.equals(req.getAppId())) { throw new PowerJobException("node list must are in the same app"); } WorkflowNodeInfoDO workflowNodeInfo; if (req.getId() != null) { workflowNodeInfo = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId())); } else { workflowNodeInfo = new WorkflowNodeInfoDO(); workflowNodeInfo.setGmtCreate(new Date()); } BeanUtils.copyProperties(req, workflowNodeInfo); workflowNodeInfo.setType(req.getType()); nodeValidateService.simpleValidate(workflowNodeInfo); workflowNodeInfo.setGmtModified(new Date()); workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo); res.add(workflowNodeInfo); } return res; } private void fillWorkflow(WorkflowInfoDO wfInfo) { PEWorkflowDAG dagInfo = null; try { dagInfo = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class); } catch (Exception e) { log.warn("[WorkflowService-{}]illegal DAG : {}", wfInfo.getId(), wfInfo.getPeDAG()); } if (dagInfo == null) { return; } Map nodeIdNodInfoMap = Maps.newHashMap(); workflowNodeInfoRepository.findByWorkflowId(wfInfo.getId()).forEach( e -> nodeIdNodInfoMap.put(e.getId(), e) ); // 填充节点信息 if (!CollectionUtils.isEmpty(dagInfo.getNodes())) { for (PEWorkflowDAG.Node node : dagInfo.getNodes()) { WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId()); if (nodeInfo != null) { node.setNodeType(nodeInfo.getType()) .setJobId(nodeInfo.getJobId()) .setEnable(nodeInfo.getEnable()) .setSkipWhenFailed(nodeInfo.getSkipWhenFailed()) .setNodeName(nodeInfo.getNodeName()) .setNodeParams(nodeInfo.getNodeParams()); } } } wfInfo.setPeDAG(JSON.toJSONString(dagInfo)); } private WorkflowInfoDO permissionCheck(Long wfId, Long appId) { WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId)); if (!wfInfo.getAppId().equals(appId)) { throw new PowerJobException("Permission Denied! can't operate other app's workflow!"); } return wfInfo; } }