WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package tech.powerjob.server.core.workflow;
 
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.server.common.SJ;
import tech.powerjob.common.enums.SwitchableStatus;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.scheduler.TimingStrategyService;
import tech.powerjob.server.core.service.NodeValidateService;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAG;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
 
import javax.annotation.Resource;
import javax.transaction.Transactional;
import java.util.*;
 
/**
 * Workflow 服务
 *
 * @author tjq
 * @author zenggonggu
 * @author Echo009
 * @since 2020/5/26
 */
@Slf4j
@Service
public class WorkflowService {
 
    @Resource
    private WorkflowInstanceManager workflowInstanceManager;
    @Resource
    private WorkflowInfoRepository workflowInfoRepository;
    @Resource
    private WorkflowNodeInfoRepository workflowNodeInfoRepository;
    @Resource
    private NodeValidateService nodeValidateService;
    @Resource
    private TimingStrategyService timingStrategyService;
 
    /**
     * 保存/修改工作流信息
     * <p>
     * 注意这里不会保存 DAG 信息
     *
     * @param req 请求
     * @return 工作流ID
     */
    @Transactional(rollbackOn = Exception.class)
    public Long saveWorkflow(SaveWorkflowRequest req) {
 
        req.valid();
 
        Long wfId = req.getId();
        WorkflowInfoDO wf;
        if (wfId == null) {
            wf = new WorkflowInfoDO();
            wf.setGmtCreate(new Date());
        } else {
            Long finalWfId = wfId;
            wf = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id:" + finalWfId));
        }
 
        BeanUtils.copyProperties(req, wf);
        wf.setGmtModified(new Date());
        wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
        wf.setTimeExpressionType(req.getTimeExpressionType().getV());
 
        if (req.getNotifyUserIds() != null) {
            wf.setNotifyUserIds(SJ.COMMA_JOINER.join(req.getNotifyUserIds()));
        }
        if (req.getLifeCycle() != null) {
            wf.setLifecycle(JSON.toJSONString(req.getLifeCycle()));
        }
        if (TimeExpressionType.FREQUENT_TYPES.contains(req.getTimeExpressionType().getV())) {
            // 固定频率类型的任务不计算
            wf.setTimeExpression(null);
        } else {
            LifeCycle lifeCycle = Optional.ofNullable(req.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
            Long nextValidTime = timingStrategyService.calculateNextTriggerTimeWithInspection(TimeExpressionType.of(wf.getTimeExpressionType()), wf.getTimeExpression(), lifeCycle.getStart(), lifeCycle.getEnd());
            wf.setNextTriggerTime(nextValidTime);
        }
        // 新增工作流,需要先 save 一下获取 ID
        if (wfId == null) {
            wf = workflowInfoRepository.saveAndFlush(wf);
            wfId = wf.getId();
        }
        wf.setPeDAG(validateAndConvert2String(wfId, req.getDag()));
        workflowInfoRepository.saveAndFlush(wf);
        return wfId;
    }
 
    /**
     * 保存 DAG 信息
     * 这里会物理删除游离的节点信息
     */
    private String validateAndConvert2String(Long wfId, PEWorkflowDAG dag) {
        if (dag == null || !WorkflowDAGUtils.valid(dag)) {
            throw new PowerJobException("illegal DAG");
        }
        // 注意:这里只会保存图相关的基础信息,nodeId,jobId,jobName(nodeAlias)
        // 其中 jobId,jobName 均以节点中的信息为准
        List<Long> nodeIdList = Lists.newArrayList();
        List<PEWorkflowDAG.Node> newNodes = Lists.newArrayList();
        WorkflowDAG complexDag = WorkflowDAGUtils.convert(dag);
        for (PEWorkflowDAG.Node node : dag.getNodes()) {
            WorkflowNodeInfoDO nodeInfo = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new PowerJobException("can't find node info by id :" + node.getNodeId()));
            // 更新工作流 ID
            if (nodeInfo.getWorkflowId() == null) {
                nodeInfo.setWorkflowId(wfId);
                nodeInfo.setGmtModified(new Date());
                workflowNodeInfoRepository.saveAndFlush(nodeInfo);
            }
            if (!wfId.equals(nodeInfo.getWorkflowId())) {
                throw new PowerJobException("can't use another workflow's node");
            }
            nodeValidateService.complexValidate(nodeInfo, complexDag);
            // 只保存节点的 ID 信息,清空其他信息
            newNodes.add(new PEWorkflowDAG.Node(node.getNodeId()));
            nodeIdList.add(node.getNodeId());
        }
        dag.setNodes(newNodes);
        int deleteCount = workflowNodeInfoRepository.deleteByWorkflowIdAndIdNotIn(wfId, nodeIdList);
        log.warn("[WorkflowService-{}] delete {} dissociative nodes of workflow", wfId, deleteCount);
        return JSON.toJSONString(dag);
    }
 
 
    /**
     * 深度复制工作流
     *
     * @param wfId  工作流 ID
     * @param appId APP ID
     * @return 生成的工作流 ID
     */
    @Transactional(rollbackOn = Exception.class)
    public long copyWorkflow(Long wfId, Long appId) {
 
        WorkflowInfoDO originWorkflow = permissionCheck(wfId, appId);
        if (originWorkflow.getStatus() == SwitchableStatus.DELETED.getV()) {
            throw new IllegalStateException("can't copy the workflow which has been deleted!");
        }
        // 拷贝基础信息
        WorkflowInfoDO copyWorkflow = new WorkflowInfoDO();
        BeanUtils.copyProperties(originWorkflow, copyWorkflow);
        copyWorkflow.setId(null);
        copyWorkflow.setGmtCreate(new Date());
        copyWorkflow.setGmtModified(new Date());
        copyWorkflow.setWfName(copyWorkflow.getWfName() + "_COPY");
        // 先 save 获取 id
        copyWorkflow = workflowInfoRepository.saveAndFlush(copyWorkflow);
 
        if (StringUtils.isEmpty(copyWorkflow.getPeDAG())) {
            return copyWorkflow.getId();
        }
 
        PEWorkflowDAG dag = JSON.parseObject(copyWorkflow.getPeDAG(), PEWorkflowDAG.class);
 
        // 拷贝节点信息,并且更新 DAG 中的节点信息
        if (!CollectionUtils.isEmpty(dag.getNodes())) {
            // originNodeId => copyNodeId
            HashMap<Long, Long> nodeIdMap = new HashMap<>(dag.getNodes().size(), 1);
            // 校正 节点信息
            for (PEWorkflowDAG.Node node : dag.getNodes()) {
 
                WorkflowNodeInfoDO originNode = workflowNodeInfoRepository.findById(node.getNodeId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + node.getNodeId()));
 
                WorkflowNodeInfoDO copyNode = new WorkflowNodeInfoDO();
                BeanUtils.copyProperties(originNode, copyNode);
                copyNode.setId(null);
                copyNode.setWorkflowId(copyWorkflow.getId());
                copyNode.setGmtCreate(new Date());
                copyNode.setGmtModified(new Date());
 
                copyNode = workflowNodeInfoRepository.saveAndFlush(copyNode);
                nodeIdMap.put(originNode.getId(), copyNode.getId());
 
                node.setNodeId(copyNode.getId());
            }
            // 校正 边信息
            for (PEWorkflowDAG.Edge edge : dag.getEdges()) {
                edge.setFrom(nodeIdMap.get(edge.getFrom()));
                edge.setTo(nodeIdMap.get(edge.getTo()));
            }
        }
        copyWorkflow.setPeDAG(JSON.toJSONString(dag));
        workflowInfoRepository.saveAndFlush(copyWorkflow);
        return copyWorkflow.getId();
    }
 
 
    /**
     * 获取工作流元信息,这里获取到的 DAG 包含节点的完整信息(是否启用、是否允许失败跳过)
     *
     * @param wfId  工作流ID
     * @param appId 应用ID
     * @return 对外输出对象
     */
    public WorkflowInfoDO fetchWorkflow(Long wfId, Long appId) {
        WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
        fillWorkflow(wfInfo);
        return wfInfo;
    }
 
    /**
     * 删除工作流(软删除)
     *
     * @param wfId  工作流ID
     * @param appId 所属应用ID
     */
    public void deleteWorkflow(Long wfId, Long appId) {
        WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
        wfInfo.setStatus(SwitchableStatus.DELETED.getV());
        wfInfo.setGmtModified(new Date());
        workflowInfoRepository.saveAndFlush(wfInfo);
    }
 
    /**
     * 禁用工作流
     *
     * @param wfId  工作流ID
     * @param appId 所属应用ID
     */
    public void disableWorkflow(Long wfId, Long appId) {
        WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
        wfInfo.setStatus(SwitchableStatus.DISABLE.getV());
        wfInfo.setGmtModified(new Date());
        workflowInfoRepository.saveAndFlush(wfInfo);
    }
 
    /**
     * 启用工作流
     *
     * @param wfId  工作流ID
     * @param appId 所属应用ID
     */
    public void enableWorkflow(Long wfId, Long appId) {
        WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
        wfInfo.setStatus(SwitchableStatus.ENABLE.getV());
        wfInfo.setGmtModified(new Date());
        workflowInfoRepository.saveAndFlush(wfInfo);
    }
 
    /**
     * 立即运行工作流
     *
     * @param wfId       工作流ID
     * @param appId      所属应用ID
     * @param initParams 启动参数
     * @param delay      延迟时间
     * @return 该 workflow 实例的 instanceId(wfInstanceId)
     */
    @DesignateServer
    public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) {
 
        delay = delay == null ? 0 : delay;
        WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
 
        log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
        Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay, null);
        if (delay <= 0) {
            workflowInstanceManager.start(wfInfo, wfInstanceId);
        } else {
            InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
        }
        return wfInstanceId;
    }
 
 
    /**
     * 保存工作流节点(新增 或者 保存)
     *
     * @param workflowNodeRequestList 工作流节点
     * @return 更新 或者 创建后的工作流节点信息
     */
    @Transactional(rollbackOn = Exception.class)
    public List<WorkflowNodeInfoDO> saveWorkflowNode(List<SaveWorkflowNodeRequest> workflowNodeRequestList) {
        if (CollectionUtils.isEmpty(workflowNodeRequestList)) {
            return Collections.emptyList();
        }
        final Long appId = workflowNodeRequestList.get(0).getAppId();
        List<WorkflowNodeInfoDO> res = new ArrayList<>(workflowNodeRequestList.size());
        for (SaveWorkflowNodeRequest req : workflowNodeRequestList) {
            req.valid();
            // 必须位于同一个 APP 下
            if (!appId.equals(req.getAppId())) {
                throw new PowerJobException("node list must are in the same app");
            }
            WorkflowNodeInfoDO workflowNodeInfo;
            if (req.getId() != null) {
                workflowNodeInfo = workflowNodeInfoRepository.findById(req.getId()).orElseThrow(() -> new IllegalArgumentException("can't find workflow Node by id: " + req.getId()));
            } else {
                workflowNodeInfo = new WorkflowNodeInfoDO();
                workflowNodeInfo.setGmtCreate(new Date());
            }
            BeanUtils.copyProperties(req, workflowNodeInfo);
            workflowNodeInfo.setType(req.getType());
            nodeValidateService.simpleValidate(workflowNodeInfo);
            workflowNodeInfo.setGmtModified(new Date());
            workflowNodeInfo = workflowNodeInfoRepository.saveAndFlush(workflowNodeInfo);
            res.add(workflowNodeInfo);
        }
        return res;
    }
 
 
    private void fillWorkflow(WorkflowInfoDO wfInfo) {
 
        PEWorkflowDAG dagInfo = null;
        try {
            dagInfo = JSON.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class);
        } catch (Exception e) {
            log.warn("[WorkflowService-{}]illegal DAG : {}", wfInfo.getId(), wfInfo.getPeDAG());
        }
        if (dagInfo == null) {
            return;
        }
 
        Map<Long, WorkflowNodeInfoDO> nodeIdNodInfoMap = Maps.newHashMap();
 
        workflowNodeInfoRepository.findByWorkflowId(wfInfo.getId()).forEach(
                e -> nodeIdNodInfoMap.put(e.getId(), e)
        );
        // 填充节点信息
        if (!CollectionUtils.isEmpty(dagInfo.getNodes())) {
            for (PEWorkflowDAG.Node node : dagInfo.getNodes()) {
                WorkflowNodeInfoDO nodeInfo = nodeIdNodInfoMap.get(node.getNodeId());
                if (nodeInfo != null) {
                    node.setNodeType(nodeInfo.getType())
                            .setJobId(nodeInfo.getJobId())
                            .setEnable(nodeInfo.getEnable())
                            .setSkipWhenFailed(nodeInfo.getSkipWhenFailed())
                            .setNodeName(nodeInfo.getNodeName())
                            .setNodeParams(nodeInfo.getNodeParams());
                }
            }
        }
        wfInfo.setPeDAG(JSON.toJSONString(dagInfo));
    }
 
 
    private WorkflowInfoDO permissionCheck(Long wfId, Long appId) {
        WorkflowInfoDO wfInfo = workflowInfoRepository.findById(wfId).orElseThrow(() -> new IllegalArgumentException("can't find workflow by id: " + wfId));
        if (!wfInfo.getAppId().equals(appId)) {
            throw new PowerJobException("Permission Denied! can't operate other app's workflow!");
        }
        return wfInfo;
    }
}