WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
package tech.powerjob.server.core.workflow.algorithm;
 
import com.google.common.collect.*;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
 
import java.util.*;
 
/**
 * DAG 工具类
 *
 * @author tjq
 * @author Echo009
 * @since 2020/5/26
 */
public class WorkflowDAGUtils {
 
    private WorkflowDAGUtils() {
 
    }
 
    /**
     * 重置可重试节点的状态信息
     * @param dag 合法的有向无环图
     */
    public static void resetRetryableNode(PEWorkflowDAG dag){
        // 将需要重试的节点状态重置(失败且不允许跳过的 或者 手动终止的)
        for (PEWorkflowDAG.Node node : dag.getNodes()) {
            boolean realFailed = node.getStatus() == InstanceStatus.FAILED.getV() && isNotAllowSkipWhenFailed(node);
            if (realFailed || node.getStatus() == InstanceStatus.STOPPED.getV()) {
                node.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
                // 仅重置任务节点的实例 id 信息
                if (node.getNodeType() == null || node.getNodeType() == WorkflowNodeType.JOB.getCode()){
                    node.setInstanceId(null);
                }
            }
        }
    }
 
    /**
     * 获取所有根节点
     *
     * @param peWorkflowDAG 点线表示法的DAG图
     * @return 根节点列表
     */
    public static List<PEWorkflowDAG.Node> listRoots(PEWorkflowDAG peWorkflowDAG) {
 
        Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
        peWorkflowDAG.getNodes().forEach(node -> nodeId2Node.put(node.getNodeId(), node));
        peWorkflowDAG.getEdges().forEach(edge -> nodeId2Node.remove(edge.getTo()));
 
        return Lists.newLinkedList(nodeId2Node.values());
    }
 
    /**
     * 校验 DAG 是否有效
     *
     * @param peWorkflowDAG 点线表示法的 DAG 图
     * @return true/false
     */
    public static boolean valid(PEWorkflowDAG peWorkflowDAG) {
 
        // 校验节点 ID 是否重复
        Set<Long> nodeIds = Sets.newHashSet();
        // 不允许为空
        if (peWorkflowDAG.getNodes() == null || peWorkflowDAG.getNodes().isEmpty()){
            return false;
        }
        for (PEWorkflowDAG.Node n : peWorkflowDAG.getNodes()) {
            if (nodeIds.contains(n.getNodeId())) {
                return false;
            }
            nodeIds.add(n.getNodeId());
        }
 
        try {
            // 记录遍历过的所有节点 ID
            HashSet<Long> traversalNodeIds = Sets.newHashSet();
            WorkflowDAG dag = convert(peWorkflowDAG);
            // 检查所有顶点的路径
            for (WorkflowDAG.Node root : dag.getRoots()) {
                if (invalidPath(root, Sets.newHashSet(), traversalNodeIds)) {
                    return false;
                }
            }
            // 理论上应该遍历过图中的所有节点,如果不相等则说明有环 (孤立的环)
            return traversalNodeIds.size() == nodeIds.size();
 
        } catch (Exception ignore) {
            // ignore
        }
        return false;
    }
 
 
    /**
     * Add by Echo009 on 2021/02/08
     * 获取准备好的节点(非完成状态的节点且,前置依赖节点为空或者均处于完成状态)
     * 注意,这里会直接将当前 disable(enable = false)的节点的状态置为完成
     *
     * @param dag 点线表示法的DAG图
     * @return 当前可执行的节点
     */
    public static List<PEWorkflowDAG.Node> listReadyNodes(PEWorkflowDAG dag) {
        // 保存 nodeId -> Node 的映射关系
        Map<Long, PEWorkflowDAG.Node> nodeId2Node = Maps.newHashMap();
 
        List<PEWorkflowDAG.Node> dagNodes = dag.getNodes();
        for (PEWorkflowDAG.Node node : dagNodes) {
            nodeId2Node.put(node.getNodeId(), node);
        }
        // 构建依赖树(下游任务需要哪些上游任务完成才能执行)
        Multimap<Long, Long> relyMap = LinkedListMultimap.create();
        // 后继节点 Map
        Multimap<Long, Long> successorMap = LinkedListMultimap.create();
        dag.getEdges().forEach(edge -> {
            relyMap.put(edge.getTo(), edge.getFrom());
            successorMap.put(edge.getFrom(), edge.getTo());
        });
        List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
        List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();
 
        for (PEWorkflowDAG.Node currentNode : dagNodes) {
            if (!isReadyNode(currentNode.getNodeId(), nodeId2Node, relyMap)) {
                continue;
            }
            // 需要直接跳过的节点
            if (currentNode.getEnable() != null && !currentNode.getEnable()) {
                skipNodes.add(currentNode);
            } else {
                readyNodes.add(currentNode);
            }
        }
        // 当前直接跳过的节点不为空
        if (!skipNodes.isEmpty()) {
            for (PEWorkflowDAG.Node skipNode : skipNodes) {
                // move
                readyNodes.addAll(moveAndObtainReadySuccessor(skipNode, nodeId2Node, relyMap, successorMap));
            }
        }
        return readyNodes;
    }
 
    /**
     * 移动并获取就绪的后继节点
     *
     * @param skippedNode  当前需要跳过的节点
     * @param nodeId2Node  nodeId -> Node
     * @param relyMap      to-node id  -> list of from-node id
     * @param successorMap from-node id -> list of to-node id
     * @return 就绪的后继节点
     */
    private static List<PEWorkflowDAG.Node> moveAndObtainReadySuccessor(PEWorkflowDAG.Node skippedNode, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, Long> relyMap, Multimap<Long, Long> successorMap) {
 
        // 更新当前跳过节点的状态
        skippedNode.setStatus(InstanceStatus.SUCCEED.getV());
        skippedNode.setResult(SystemInstanceResult.DISABLE_NODE);
        // 有可能出现需要连续移动的情况
        List<PEWorkflowDAG.Node> readyNodes = Lists.newArrayList();
        List<PEWorkflowDAG.Node> skipNodes = Lists.newArrayList();
        // 获取当前跳过节点的后继节点
        Collection<Long> successors = successorMap.get(skippedNode.getNodeId());
        for (Long successor : successors) {
            // 判断后继节点是否处于 Ready 状态(前驱节点均处于完成状态)
            if (isReadyNode(successor, nodeId2Node, relyMap)) {
                PEWorkflowDAG.Node node = nodeId2Node.get(successor);
                if (node.getEnable() != null && !node.getEnable()) {
                    // 需要跳过
                    skipNodes.add(node);
                    continue;
                }
                readyNodes.add(node);
            }
        }
        // 深度优先,继续移动
        if (!skipNodes.isEmpty()) {
            for (PEWorkflowDAG.Node node : skipNodes) {
                readyNodes.addAll(moveAndObtainReadySuccessor(node, nodeId2Node, relyMap, successorMap));
            }
        }
        return readyNodes;
    }
 
    /**
     * 判断当前节点是否准备就绪
     *
     * @param nodeId      Node id
     * @param nodeId2Node Node id -> Node
     * @param relyMap     to-node id  -> list of from-node id
     * @return true if current node is ready
     */
    private static boolean isReadyNode(long nodeId, Map<Long, PEWorkflowDAG.Node> nodeId2Node, Multimap<Long, Long> relyMap) {
        PEWorkflowDAG.Node currentNode = nodeId2Node.get(nodeId);
        int currentNodeStatus = currentNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : currentNode.getStatus();
        // 跳过已完成节点(处理成功 或者 处理失败)和已派发节点( 状态为运行中 )
        if (InstanceStatus.FINISHED_STATUS.contains(currentNodeStatus)
                || currentNodeStatus == InstanceStatus.RUNNING.getV()) {
            return false;
        }
        Collection<Long> relyNodeIds = relyMap.get(nodeId);
        for (Long relyNodeId : relyNodeIds) {
            PEWorkflowDAG.Node relyNode = nodeId2Node.get(relyNodeId);
            int relyNodeStatus = relyNode.getStatus() == null ? InstanceStatus.WAITING_DISPATCH.getV() : relyNode.getStatus();
            // 只要依赖的节点有一个未完成,那么就不是就绪状态
            // 注意,这里允许失败的原因是有允许失败跳过节点的存在,对于不允许跳过的失败节点,一定走不到这里(工作流会被打断)
            if (InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(relyNodeStatus)) {
                return false;
            }
        }
        return true;
    }
 
    public static boolean isNotAllowSkipWhenFailed(PEWorkflowDAG.Node node) {
        // 默认不允许跳过
        return node.getSkipWhenFailed() == null || !node.getSkipWhenFailed();
    }
 
    /**
     * 处理被 disable 掉的边
     * 1. 将仅能通过被 disable 掉的边可达的节点标记为 disable(disableByControlNode),将状态更新为已取消
     * 2. 将这些被 disable 掉的节点的出口边都标记为 disable
     * 3. 递归调用自身,继续处理被 disable 的边
     */
    @SuppressWarnings("squid:S3776")
    public static void handleDisableEdges(List<PEWorkflowDAG.Edge> disableEdges, WorkflowDAG dag) {
        if (disableEdges.isEmpty()) {
            return;
        }
        List<PEWorkflowDAG.Node> disableNodes = Lists.newArrayList();
        // 处理边上的节点,如果该节点仅能通过被 disable 掉的边可达,那么将该节点标记为 disable ,disableByControlNode ,并且将状态更新为 已取消
        for (PEWorkflowDAG.Edge disableEdge : disableEdges) {
            WorkflowDAG.Node toNode = dag.getNode(disableEdge.getTo());
            // 判断是否仅能通过被 disable 掉的边可达
            Collection<PEWorkflowDAG.Edge> dependenceEdges = toNode.getDependenceEdgeMap().values();
            boolean shouldBeDisable = true;
            for (PEWorkflowDAG.Edge dependenceEdge : dependenceEdges) {
                if (dependenceEdge.getEnable() == null || dependenceEdge.getEnable()) {
                    shouldBeDisable = false;
                    break;
                }
            }
            if (shouldBeDisable) {
                // disable
                PEWorkflowDAG.Node node = toNode.getHolder();
                node.setEnable(false)
                        .setDisableByControlNode(true)
                        .setStatus(InstanceStatus.CANCELED.getV());
                disableNodes.add(node);
            }
        }
        if (!disableNodes.isEmpty()) {
            // 被 disable 掉的节点的出口边都会被标记为 disable
            List<PEWorkflowDAG.Edge> targetEdges = Lists.newArrayList();
            for (PEWorkflowDAG.Node disableNode : disableNodes) {
                WorkflowDAG.Node node = dag.getNode(disableNode.getNodeId());
                Collection<PEWorkflowDAG.Edge> edges = node.getSuccessorEdgeMap().values();
                for (PEWorkflowDAG.Edge edge : edges) {
                    edge.setEnable(false);
                    targetEdges.add(edge);
                }
            }
            // 广度优先 继续处理被 disable 掉的边
            handleDisableEdges(targetEdges, dag);
        }
    }
 
    /**
     * 将点线表示法的DAG图转化为引用表达法的DAG图
     *
     * @param peWorkflowDAG 点线表示法的DAG图
     * @return 引用表示法的DAG图
     */
    public static WorkflowDAG convert(PEWorkflowDAG peWorkflowDAG) {
        Set<Long> rootIds = Sets.newHashSet();
        Map<Long, WorkflowDAG.Node> id2Node = Maps.newHashMap();
 
        if (peWorkflowDAG.getNodes() == null || peWorkflowDAG.getNodes().isEmpty()) {
            throw new PowerJobException("empty graph");
        }
 
        // 创建节点
        peWorkflowDAG.getNodes().forEach(node -> {
            Long nodeId = node.getNodeId();
            WorkflowDAG.Node n = new WorkflowDAG.Node(node);
            id2Node.put(nodeId, n);
            // 初始阶段,每一个点都设为顶点
            rootIds.add(nodeId);
        });
 
        // 连接图像
        peWorkflowDAG.getEdges().forEach(edge -> {
            WorkflowDAG.Node from = id2Node.get(edge.getFrom());
            WorkflowDAG.Node to = id2Node.get(edge.getTo());
 
            if (from == null || to == null) {
                throw new PowerJobException("Illegal Edge: " + JsonUtils.toJSONString(edge));
            }
 
            from.getSuccessors().add(to);
            from.getSuccessorEdgeMap().put(to, edge);
            to.getDependencies().add(from);
            to.getDependenceEdgeMap().put(from, edge);
            // 被连接的点不可能成为 root,移除
            rootIds.remove(to.getNodeId());
        });
 
        // 合法性校验(至少存在一个顶点)
        if (rootIds.isEmpty()) {
            throw new PowerJobException("Illegal DAG: " + JsonUtils.toJSONString(peWorkflowDAG));
        }
 
        List<WorkflowDAG.Node> roots = Lists.newLinkedList();
        rootIds.forEach(id -> roots.add(id2Node.get(id)));
        return new WorkflowDAG(roots, id2Node);
    }
 
 
    private static boolean invalidPath(WorkflowDAG.Node root, Set<Long> ids, Set<Long> nodeIdContainer) {
 
        // 递归出口(出现之前的节点则代表有环,失败;出现无后继者节点,则说明该路径成功)
        if (ids.contains(root.getNodeId())) {
            return true;
        }
        nodeIdContainer.add(root.getNodeId());
        if (root.getSuccessors().isEmpty()) {
            return false;
        }
        ids.add(root.getNodeId());
        for (WorkflowDAG.Node node : root.getSuccessors()) {
            if (invalidPath(node, Sets.newHashSet(ids), nodeIdContainer)) {
                return true;
            }
        }
        return false;
    }
}