package tech.powerjob.server.core.scheduler; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.server.common.Holder; import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.core.DispatchService; import tech.powerjob.server.core.instance.InstanceManager; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo; import tech.powerjob.server.persistence.remote.repository.*; import tech.powerjob.server.remote.transporter.TransportService; import java.util.*; import java.util.stream.Collectors; /** * 定时状态检查 * * @author tjq * @since 2020/4/7 */ @Slf4j @Service @RequiredArgsConstructor public class InstanceStatusCheckService { private static final int MAX_BATCH_NUM_APP = 10; private static final int MAX_BATCH_NUM_INSTANCE = 3000; private static final int MAX_BATCH_UPDATE_NUM = 500; private static final long DISPATCH_TIMEOUT_MS = 30000; private static final long RECEIVE_TIMEOUT_MS = 60000; private static final long RUNNING_TIMEOUT_MS = 60000; private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000; public static final long CHECK_INTERVAL = 10000; private final TransportService transportService; private final DispatchService dispatchService; private final InstanceManager instanceManager; private final WorkflowInstanceManager workflowInstanceManager; private final AppInfoRepository appInfoRepository; private final JobInfoRepository jobInfoRepository; private final InstanceInfoRepository instanceInfoRepository; private final WorkflowInfoRepository workflowInfoRepository; private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; public void checkWorkflowInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; } try { checkWorkflowInstance(allAppIds); } catch (Exception e) { log.error("[InstanceStatusChecker] WorkflowInstance status check failed.", e); } log.info("[InstanceStatusChecker] WorkflowInstance status check using {}.", stopwatch.stop()); } /** * 检查等待派发的实例 * WAITING_DISPATCH 超时:写入时间轮但未调度前 server down */ public void checkWaitingDispatchInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; } try { // 检查等待 WAITING_DISPATCH 状态的任务 Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingDispatchInstance); } catch (Exception e) { log.error("[InstanceStatusChecker] WaitingDispatchInstance status check failed.", e); } log.info("[InstanceStatusChecker] WaitingDispatchInstance status check using {}.", stopwatch.stop()); } /** * 检查等待 worker 接收的实例 * WAITING_WORKER_RECEIVE 超时:由于网络错误导致 worker 未接受成功 */ public void checkWaitingWorkerReceiveInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; } try { // 检查 WAITING_WORKER_RECEIVE 状态的任务 Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingWorkerReceiveInstance); } catch (Exception e) { log.error("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check failed.", e); } log.info("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check using {}.", stopwatch.stop()); } /** * 检查运行中的实例 * RUNNING 超时:TaskTracker down,断开与 server 的心跳连接 */ public void checkRunningInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; } try { // 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleRunningInstance); } catch (Exception e) { log.error("[InstanceStatusChecker] RunningInstance status check failed.", e); } log.info("[InstanceStatusChecker] RunningInstance status check using {}.", stopwatch.stop()); } private void handleWaitingDispatchInstance(List appIds) { // 存在移除操作,需要重新创建集合,否则会导致外层抛出 NoSuchElementException: null List partAppIds = Lists.newArrayList(appIds); // 1. 检查等待 WAITING_DISPATCH 状态的任务 long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; List waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); while (!waitingDispatchInstances.isEmpty()) { List overloadAppIdList = new ArrayList<>(); long startTime = System.currentTimeMillis(); // 按照 appId 分组处理,方便处理超载的逻辑 Map> waitingDispatchInstancesMap = waitingDispatchInstances.stream().collect(Collectors.groupingBy(InstanceInfoDO::getAppId)); for (Map.Entry> entry : waitingDispatchInstancesMap.entrySet()) { final Long currentAppId = entry.getKey(); final List currentAppWaitingDispatchInstances = entry.getValue(); // collect job id Set jobIds = currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getJobId).collect(Collectors.toSet()); // query job info and map Map jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e)); log.warn("[InstanceStatusChecker] find some instance in app({}) which is not triggered as expected: {}", currentAppId, currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getInstanceId).collect(Collectors.toList())); final Holder overloadFlag = new Holder<>(false); // 先这么简单处理没问题,毕竟只有这一个地方用了 parallelStream currentAppWaitingDispatchInstances.parallelStream().forEach(instance -> { if (overloadFlag.get()) { // 直接忽略 return; } Optional jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId())); if (jobInfoOpt.isPresent()) { // 处理等待派发的任务没有必要再重置一次状态,减少 io 次数 dispatchService.dispatch(jobInfoOpt.get(), instance.getInstanceId(), Optional.of(instance), Optional.of(overloadFlag)); } else { log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance); final Optional opt = instanceInfoRepository.findById(instance.getId()); opt.ifPresent(instanceInfoDO -> updateFailedInstance(instanceInfoDO, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO)); } }); threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS; if (overloadFlag.get()) { overloadAppIdList.add(currentAppId); } } log.info("[InstanceStatusChecker] process {} task,use {} ms", waitingDispatchInstances.size(), System.currentTimeMillis() - startTime); if (!overloadAppIdList.isEmpty()) { log.warn("[InstanceStatusChecker] app[{}] is overload, so skip check waiting dispatch instance", overloadAppIdList); partAppIds.removeAll(overloadAppIdList); } if (partAppIds.isEmpty()) { break; } waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); } } private void handleWaitingWorkerReceiveInstance(List partAppIds) { // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务 long threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; List waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); while (!waitingWorkerReceiveInstances.isEmpty()) { log.warn("[InstanceStatusChecker] find some instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList())); final List> partitions = Lists.partition(waitingWorkerReceiveInstances, MAX_BATCH_UPDATE_NUM); for (List partition : partitions) { dispatchService.redispatchBatchAsyncLockFree(partition.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()), InstanceStatus.WAITING_WORKER_RECEIVE.getV()); } // 重新查询 threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); } } private void handleRunningInstance(List partAppIds) { // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败) long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; List failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); while (!failedInstances.isEmpty()) { // collect job id Set jobIds = failedInstances.stream().map(BriefInstanceInfo::getJobId).collect(Collectors.toSet()); // query job info and map Map jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e)); log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", failedInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList())); failedInstances.forEach(instance -> { Optional jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId())); if (!jobInfoOpt.isPresent()) { final Optional opt = instanceInfoRepository.findById(instance.getId()); opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT)); return; } TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoOpt.get().getTimeExpressionType()); SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoOpt.get().getStatus()); // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度 if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) { final Optional opt = instanceInfoRepository.findById(instance.getId()); opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT)); return; } // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 if (instance.getRunningTimes() < jobInfoOpt.get().getInstanceRetryNum()) { dispatchService.redispatchAsync(instance.getInstanceId(), InstanceStatus.RUNNING.getV()); } else { final Optional opt = instanceInfoRepository.findById(instance.getId()); opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT)); } }); threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS; failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE)); } } /** * 定期检查工作流实例状态 * 此处仅检查并重试长时间处于 WAITING 状态的工作流实例,工作流的其他可靠性由 Instance 支撑,即子任务失败会反馈会 WorkflowInstance * * @param allAppIds 本系统所承担的所有 appIds */ private void checkWorkflowInstance(List allAppIds) { // 重试长时间处于 WAITING 状态的工作流实例 long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS; Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(partAppIds -> { List waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold); if (!CollectionUtils.isEmpty(waitingWfInstanceList)) { List wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList()); log.warn("[WorkflowInstanceChecker] wfInstance({}) is not started as expected, oms try to restart these workflowInstance.", wfInstanceIds); waitingWfInstanceList.forEach(wfInstance -> { Optional workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId()); workflowOpt.ifPresent(workflowInfo -> { workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId()); log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId()); }); }); } }); } /** * 处理失败的任务实例 */ private void updateFailedInstance(InstanceInfoDO instance, String result) { log.warn("[InstanceStatusChecker] instance[{}] failed due to {}, instanceInfo: {}", instance.getInstanceId(), result, instance); instance.setStatus(InstanceStatus.FAILED.getV()); instance.setFinishedTime(System.currentTimeMillis()); instance.setGmtModified(new Date()); instance.setResult(result); instanceInfoRepository.saveAndFlush(instance); instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, result); } }