package tech.powerjob.server.migrate; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.RequiredArgsConstructor; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.persistence.criteria.Predicate; import javax.transaction.Transactional; import java.util.*; import java.util.concurrent.TimeUnit; /** * Help users upgrade from a low version of powerjob-server to a high version of powerjob-server * * @author tjq * @author Echo009 * @since 2021/3/5 */ @Service @Slf4j @RequiredArgsConstructor public class V3ToV4MigrateService { private static final String MIGRATE_LOCK_TEMPLATE = "v3to4MigrateLock-%s-%s"; private final LockService lockService; private final JobInfoRepository jobInfoRepository; private final WorkflowInfoRepository workflowInfoRepository; private final WorkflowNodeInfoRepository workflowNodeInfoRepository; /* ********************** 3.x => 4.x ********************** */ /** * 修复该 APP 下使用了弃用的处理器类型 {@link ProcessorType#SHELL} 以及 {@link ProcessorType#PYTHON} 的任务 * 将其替换为官方提供的 Processor */ @Transactional(rollbackOn = Exception.class) public JSONObject fixDeprecatedProcessType(Long appId) { final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixDeprecatedProcessType", appId); // 120 s boolean getLock = lockService.tryLock(lock, 120000); if (!getLock) { throw new PowerJobException("get lock failed, maybe other migrate job is running"); } try { JSONObject resultLog = new JSONObject(); resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor"); resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'"); Set convertedJobIds = Sets.newHashSet(); Specification specification = (root, query, criteriaBuilder) -> { List predicates = Lists.newLinkedList(); List scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV()); predicates.add(criteriaBuilder.equal(root.get("appId"), appId)); predicates.add(root.get("processorType").in(scriptJobTypes)); return query.where(predicates.toArray(new Predicate[0])).getRestriction(); }; List scriptJobs = jobInfoRepository.findAll(specification); resultLog.put("scriptJobsNum", scriptJobs.size()); Stopwatch stopwatch = Stopwatch.createStarted(); log.info("[FixDeprecatedProcessType] start to fix the job info whose processor type is deprecated,total number : {}", scriptJobs.size()); scriptJobs.forEach(job -> { ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType()); job.setJobParams(job.getProcessorInfo()); job.setProcessorType(ProcessorType.BUILT_IN.getV()); if (oldProcessorType == ProcessorType.PYTHON) { job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor"); } else { job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor"); } jobInfoRepository.saveAndFlush(job); convertedJobIds.add(job.getId()); }); resultLog.put("convertedJobIds", convertedJobIds); stopwatch.stop(); log.info("[FixDeprecatedProcessType] fix the job info successfully,used time: {}s", stopwatch.elapsed(TimeUnit.SECONDS)); return resultLog; } catch (Exception e) { // log log.error("[FixDeprecatedProcessType] fail to fix the job info of app {}", appId, e); // rethrow throw e; } finally { lockService.unlock(lock); } } /** * 修复该 APP 下的工作流信息,允许部分修复成功 * 1、自动生成对应的节点信息 {@link WorkflowNodeInfoDO} * 2、修复 DAG 信息(边+节点ID) */ @SuppressWarnings("squid:S1141") public JSONObject fixWorkflowInfoFromV3ToV4(Long appId) { final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixWorkflowInfoFromV3ToV4", appId); // 180 s boolean getLock = lockService.tryLock(lock, 180000); if (!getLock) { throw new PowerJobException("get lock failed, maybe other migrate job is running"); } try { JSONObject resultLog = new JSONObject(); Set fixedWorkflowIds = Sets.newHashSet(); List workflowInfoList = workflowInfoRepository.findByAppId(appId); resultLog.put("totalNum", workflowInfoList.size()); Stopwatch stopwatch = Stopwatch.createStarted(); log.info("[FixWorkflowInfoFromV3ToV4] start to fix the workflow info, total number : {}", workflowInfoList.size()); HashMap jobId2NodeIdMap = new HashMap<>(64); HashMap failureReasonMap = new HashMap<>(workflowInfoList.size() / 2 + 1); for (WorkflowInfoDO workflowInfo : workflowInfoList) { try { boolean fixed = SpringUtils.getBean(this.getClass()).fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap); if (fixed) { fixedWorkflowIds.add(workflowInfo.getId()); } } catch (Exception e) { // 记录失败原因 failureReasonMap.put(workflowInfo.getId(), e.toString()); } // 清空映射关系 jobId2NodeIdMap.clear(); } stopwatch.stop(); log.info("[FixWorkflowInfoFromV3ToV4] fix the workflow info successfully, total number : {}, fixed number : {}, used time: {}s", workflowInfoList.size(), fixedWorkflowIds.size(), stopwatch.elapsed(TimeUnit.SECONDS)); resultLog.put("fixedWorkflowIds", fixedWorkflowIds); resultLog.put("failureWorkflowInfo", failureReasonMap); return resultLog; } catch (Exception e) { // log log.error("[FixWorkflowInfoFromV3ToV4] fail to fix the workflow info of app {}", appId, e); // rethrow throw e; } finally { lockService.unlock(lock); } } /** * 有两种情况会修复失败 * 1、节点对应 job 信息缺失(被物理删除) * 2、图中一部分节点有 nodeId,一部分没有 */ @Transactional(rollbackOn = Exception.class) public boolean fixWorkflowInfoCoreFromV3ToV4(WorkflowInfoDO workflowInfo, Map jobId2NodeIdMap) { String dag = workflowInfo.getPeDAG(); PEWorkflowDAG peDag; try { peDag = JSON.parseObject(dag, PEWorkflowDAG.class); } catch (Exception e) { throw new PowerJobException("invalid DAG!"); } if (peDag == null || CollectionUtils.isEmpty(peDag.getNodes())) { // 不需要修复 return false; } // 只要有任意一个节点中存在 nodeId ,那么就不需要修复 // 如果没有直接在 DB 改过数据,那么不可能出现一部分节点有 id,一部分没有的情况 boolean needFix = false; boolean existNodeId = false; for (PEWorkflowDAG.Node node : peDag.getNodes()) { if (node.getNodeId() == null) { needFix = true; } else { existNodeId = true; } } // 存在错误数据(一部分节点有 id,一部分没有),这种情况下只能让用户手工修复数据了 if (needFix && existNodeId) { throw new PowerJobException("sorry,we can't fix this workflow info automatically whose node info is wrong! you need to fix them by yourself."); } // 不需要修复,所有节点 id 均存在 if (!needFix) { return false; } // 修复节点信息 for (PEWorkflowDAG.Node node : peDag.getNodes()) { JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseThrow(() -> new PowerJobException("can't find job by id " + node.getJobId())); WorkflowNodeInfoDO nodeInfo = new WorkflowNodeInfoDO(); nodeInfo.setWorkflowId(workflowInfo.getId()); nodeInfo.setAppId(workflowInfo.getAppId()); nodeInfo.setJobId(jobInfo.getId()); // 默认启用,不允许失败跳过,参数和 Job 保持一致 nodeInfo.setNodeName(jobInfo.getJobName()); nodeInfo.setNodeParams(jobInfo.getJobParams()); nodeInfo.setEnable(true); nodeInfo.setSkipWhenFailed(false); nodeInfo.setGmtCreate(new Date()); nodeInfo.setGmtModified(new Date()); nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo); // 更新节点 ID node.setNodeId(nodeInfo.getId()); node.setNodeName(nodeInfo.getNodeName()); jobId2NodeIdMap.put(node.getJobId(), node.getNodeId()); } if (!CollectionUtils.isEmpty(peDag.getEdges())) { // 修复边信息 for (PEWorkflowDAG.Edge edge : peDag.getEdges()) { // 转换为节点 ID edge.setFrom(jobId2NodeIdMap.get(edge.getFrom())); edge.setTo(jobId2NodeIdMap.get(edge.getTo())); } } workflowInfo.setPeDAG(JSON.toJSONString(peDag)); workflowInfo.setGmtModified(new Date()); workflowInfoRepository.saveAndFlush(workflowInfo); return true; } }