package tech.powerjob.server.core; import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.Holder; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.core.instance.InstanceManager; import tech.powerjob.server.core.instance.InstanceMetadataService; import tech.powerjob.server.core.lock.UseCacheLock; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.remote.transporter.TransportService; import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import tech.powerjob.server.remote.worker.selector.TaskTrackerSelectorService; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static tech.powerjob.common.enums.InstanceStatus.*; /** * 派送服务(将任务从Server派发到Worker) * * @author tjq * @author Echo009 * @since 2020/4/5 */ @Slf4j @Service @RequiredArgsConstructor public class DispatchService { private final TransportService transportService; private final WorkerClusterQueryService workerClusterQueryService; private final InstanceManager instanceManager; private final InstanceMetadataService instanceMetadataService; private final InstanceInfoRepository instanceInfoRepository; private final TaskTrackerSelectorService taskTrackerSelectorService; /** * 异步重新派发 * * @param instanceId 实例 ID */ @UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024) public void redispatchAsync(Long instanceId, int originStatus) { // 将状态重置为等待派发 instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date()); } /** * 异步批量重新派发,不加锁 */ public void redispatchBatchAsyncLockFree(List instanceIdList, int originStatus) { // 将状态重置为等待派发 instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(instanceIdList, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date()); } /** * 将任务从Server派发到Worker(TaskTracker) * 只会派发当前状态为等待派发的任务实例 * ************************************************** * 2021-02-03 modify by Echo009 * 1、移除参数 当前运行次数、工作流实例ID、实例参数 * 更改为从当前任务实例中获取获取以上信息 * 2、移除运行次数相关的(runningTimes)处理逻辑 * 迁移至 {@link InstanceManager#updateStatus} 中处理 * ************************************************** * * @param jobInfo 任务的元信息 * @param instanceId 任务实例ID * @param instanceInfoOptional 任务实例信息,可选 * @param overloadOptional 超载信息,可选 */ @UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024) public void dispatch(JobInfoDO jobInfo, Long instanceId, Optional instanceInfoOptional, Optional> overloadOptional) { // 允许从外部传入实例信息,减少 io 次数 // 检查当前任务是否被取消 InstanceInfoDO instanceInfo = instanceInfoOptional.orElseGet(() -> instanceInfoRepository.findByInstanceId(instanceId)); Long jobId = instanceInfo.getJobId(); if (CANCELED.getV() == instanceInfo.getStatus()) { log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId); return; } // 已经被派发过则不再派发 // fix 并发场景下重复派发的问题 if (instanceInfo.getStatus() != WAITING_DISPATCH.getV()) { log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been dispatched", jobId, instanceId); return; } // 任务信息已经被删除 if (jobInfo.getId() == null) { log.warn("[Dispatcher-{}|{}] cancel dispatch due to job(id={}) has been deleted!", jobId, instanceId, jobId); instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, "can't find job by id " + jobId); return; } Date now = new Date(); String dbInstanceParams = instanceInfo.getInstanceParams() == null ? "" : instanceInfo.getInstanceParams(); log.info("[Dispatcher-{}|{}] start to dispatch job: {};instancePrams: {}.", jobId, instanceId, jobInfo, dbInstanceParams); // 查询当前运行的实例数 long current = System.currentTimeMillis(); Integer maxInstanceNum = jobInfo.getMaxInstanceNum(); // 秒级任务只派发到一台机器,具体的 maxInstanceNum 由 TaskTracker 控制 if (TimeExpressionType.FREQUENT_TYPES.contains(jobInfo.getTimeExpressionType())) { maxInstanceNum = 1; } // 0 代表不限制在线任务,还能省去一次 DB 查询 if (maxInstanceNum > 0) { // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天) // 由于不统计 WAITING_DISPATCH,所以这个 runningInstanceCount 不包含本任务自身 long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); // 超出最大同时运行限制,不执行调度 if (runningInstanceCount >= maxInstanceNum) { String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum); log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, result, now); instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, result); return; } } // 获取当前最合适的 worker 列表 List suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo); if (CollectionUtils.isEmpty(suitableWorkers)) { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now); instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } // 判断是否超载,在所有可用 worker 超载的情况下直接跳过当前任务 suitableWorkers = filterOverloadWorker(suitableWorkers); if (suitableWorkers.isEmpty()) { // 直接取消派发,减少一次数据库 io overloadOptional.ifPresent(booleanHolder -> booleanHolder.set(true)); log.warn("[Dispatcher-{}|{}] cancel to dispatch job due to all worker is overload", jobId, instanceId); return; } List workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); // 构造任务调度请求 ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList); // 发送请求(不可靠,需要一个后台线程定期轮询状态) WorkerInfo taskTracker = taskTrackerSelectorService.select(jobInfo, instanceInfo, suitableWorkers); String taskTrackerAddress = taskTracker.getAddress(); URL workerUrl = ServerURLFactory.dispatchJob2Worker(taskTrackerAddress); transportService.tell(taskTracker.getProtocol(), workerUrl, req); log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req); // 修改状态 instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now, instanceInfo.getStatus()); // 装载缓存 instanceMetadataService.loadJobInfo(instanceId, jobInfo); } private List filterOverloadWorker(List suitableWorkers) { List res = new ArrayList<>(suitableWorkers.size()); for (WorkerInfo suitableWorker : suitableWorkers) { if (suitableWorker.overload()){ continue; } res.add(suitableWorker); } return res; } /** * 构造任务调度请求 */ private ServerScheduleJobReq constructServerScheduleJobReq(JobInfoDO jobInfo, InstanceInfoDO instanceInfo, List finalWorkersIpList) { // 构造请求 ServerScheduleJobReq req = new ServerScheduleJobReq(); BeanUtils.copyProperties(jobInfo, req); // 传入 JobId req.setJobId(jobInfo.getId()); // 传入 InstanceParams if (StringUtils.isEmpty(instanceInfo.getInstanceParams())) { req.setInstanceParams(null); } else { req.setInstanceParams(instanceInfo.getInstanceParams()); } // 覆盖静态参数 if (!StringUtils.isEmpty(instanceInfo.getJobParams())) { req.setJobParams(instanceInfo.getJobParams()); } req.setInstanceId(instanceInfo.getInstanceId()); req.setAllWorkerAddress(finalWorkersIpList); req.setMaxWorkerCount(jobInfo.getMaxWorkerCount()); // 设置工作流ID req.setWfInstanceId(instanceInfo.getWfInstanceId()); req.setExecuteType(ExecuteType.of(jobInfo.getExecuteType()).name()); req.setProcessorType(ProcessorType.of(jobInfo.getProcessorType()).name()); req.setTimeExpressionType(TimeExpressionType.of(jobInfo.getTimeExpressionType()).name()); if (jobInfo.getInstanceTimeLimit() != null) { req.setInstanceTimeoutMS(jobInfo.getInstanceTimeLimit()); } req.setThreadConcurrency(jobInfo.getConcurrency()); return req; } }