package tech.powerjob.server.core.workflow.hanlder.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.server.core.evaluator.GroovyEvaluator; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.hanlder.ControlNodeHandler; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import java.util.*; /** * @author Echo009 * @since 2021/12/9 */ @Slf4j @Component public class DecisionNodeHandler implements ControlNodeHandler { private final GroovyEvaluator groovyEvaluator = new GroovyEvaluator(); /** * 处理判断节点 * 1. 执行脚本 * 2. 根据返回值 disable 掉相应的边以及节点 */ @Override public void handle(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { String script = node.getNodeParams(); if (StringUtils.isBlank(script)) { log.error("[Workflow-{}|{}]decision node's param is blank! nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId()); throw new PowerJobException("decision node's param is blank!"); } // wfContext must be a map HashMap wfContext = JSON.parseObject(wfInstanceInfo.getWfContext(), new TypeReference>() { }); Object result; try { result = groovyEvaluator.evaluate(script, wfContext); } catch (Exception e) { log.error("[Workflow-{}|{}]failed to evaluate decision node,nodeId:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), e); throw new PowerJobException("can't evaluate decision node!"); } boolean finalRes; if (result instanceof Boolean) { finalRes = ((Boolean) result); } else if (result instanceof Number) { finalRes = ((Number) result).doubleValue() > 0; } else { log.error("[Workflow-{}|{}]decision node's return value is illegal,nodeId:{},result:{}", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), JsonUtils.toJSONString(result)); throw new PowerJobException("decision node's return value is illegal!"); } handleDag(finalRes, node, dag); } private void handleDag(boolean res, PEWorkflowDAG.Node node, PEWorkflowDAG peDag) { // 更新判断节点的状态为成功 node.setResult(String.valueOf(res)); node.setStatus(InstanceStatus.SUCCEED.getV()); WorkflowDAG dag = WorkflowDAGUtils.convert(peDag); // 根据节点的计算结果,将相应的边 disable WorkflowDAG.Node targetNode = dag.getNode(node.getNodeId()); Collection edges = targetNode.getSuccessorEdgeMap().values(); if (edges.isEmpty()) { return; } List disableEdges = new ArrayList<>(edges.size()); for (PEWorkflowDAG.Edge edge : edges) { // 这里一定不会出现异常 boolean property = Boolean.parseBoolean(edge.getProperty()); if (res != property) { // disable edge.setEnable(false); disableEdges.add(edge); } } WorkflowDAGUtils.handleDisableEdges(disableEdges,dag); } @Override public WorkflowNodeType matchingType() { return WorkflowNodeType.DECISION; } }