package tech.powerjob.worker.core.tracker.task.heavy; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Stopwatch; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.TaskTrackerBehavior; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.WorkerQueryExecutorClusterReq; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.common.utils.WorkflowContextUtils; import tech.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager; import tech.powerjob.worker.core.tracker.task.TaskTracker; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.persistence.TaskPersistenceService; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import tech.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 * * @author tjq * @since 2020/4/8 */ @Slf4j public abstract class HeavyTaskTracker extends TaskTracker { /** * ProcessTracker 状态管理 */ protected final ProcessorTrackerStatusHolder ptStatusHolder; /** * 数据库持久化服务 */ protected final TaskPersistenceService taskPersistenceService; /** * 定时任务线程池 */ protected ScheduledExecutorService scheduledPool; /** * 任务信息缓存 */ private final Cache taskId2BriefInfo; /** * 分段锁 */ private final SegmentLock segmentLock; private static final int UPDATE_CONCURRENCY = 4; protected HeavyTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { // 初始化成员变量 super(req,workerRuntime); // 赋予时间表达式类型 instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV()); // 保护性操作 instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.taskPersistenceService = initTaskPersistenceService(instanceInfo, workerRuntime); // 构建缓存 taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build(); // 构建分段锁 segmentLock = new SegmentLock(UPDATE_CONCURRENCY); // 子类自定义初始化操作 initTaskTracker(req); log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId); } protected TaskPersistenceService initTaskPersistenceService(InstanceInfo instanceInfo, WorkerRuntime workerRuntime) { return workerRuntime.getTaskPersistenceService(); } /** * 静态方法创建 TaskTracker * * @param req 服务端调度任务请求 * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker */ public static HeavyTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { try { TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); switch (timeExpressionType) { case FIXED_RATE: case FIXED_DELAY: return new FrequentTaskTracker(req, workerRuntime); default: return new CommonTaskTracker(req, workerRuntime); } } catch (Exception e) { reportCreateErrorToServer(req, workerRuntime, e); } return null; } /* *************************** 对外方法区 *************************** */ /** * 更新追加的上下文数据 * * @param newAppendedWfContext 追加的上下文数据 * @since 2021/02/05 */ public void updateAppendedWfContext(Map newAppendedWfContext) { // check if (instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) { // 只有工作流中的任务才有存储的必要 return; } // 检查追加的上下文大小是否超出限制 if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) { log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength()); // ignore appended workflow context data return; } for (Map.Entry entry : newAppendedWfContext.entrySet()) { String originValue = appendedWfContext.put(entry.getKey(), entry.getValue()); log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue()); } } /** * 更新Task状态 * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 * * @param subInstanceId 子任务实例ID * @param taskId task的ID(task为任务实例的执行单位) * @param newStatus task的新状态 * @param reportTime 上报时间 * @param result task的执行结果,未执行完成时为空 */ @SuppressWarnings({"squid:S3776", "squid:S2142"}) public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { return; } TaskStatus nTaskStatus = TaskStatus.of(newStatus); int lockId = taskId.hashCode(); try { // 阻塞获取锁 segmentLock.lockInterruptible(lockId); TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId); // 缓存中不存在,从数据库查 if (taskBriefInfo == null) { Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId); if (taskOpt.isPresent()) { TaskDO taskDO = taskOpt.get(); taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime()); } else { // 理论上不存在这种情况,除非数据库异常 log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId); taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L); } // 写入缓存 taskId2BriefInfo.put(taskId, taskBriefInfo); } // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题) if (taskBriefInfo.getLastReportTime() > reportTime) { log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus); return; } // 检查状态转移是否合法,fix issue 404(20240306:无排队情况下,receive 和 running 几乎会在同一时间触发,导致这两个请求先后顺序几乎无法保证,大概率有此行输出,因此日志级别降级到 INFO) if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) { log.info("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.", instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus); return; } // 此时本次请求已经有效,先更新相关信息 taskBriefInfo.setLastReportTime(reportTime); taskBriefInfo.setStatus(nTaskStatus); // 处理失败的情况 int configTaskRetryNum = instanceInfo.getTaskRetryNum(); if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) { // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB) Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId); // 查询DB再失败的话,就不重试了... if (taskOpt.isPresent()) { int failedCnt = taskOpt.get().getFailedCnt(); if (failedCnt < configTaskRetryNum) { TaskDO updateEntity = new TaskDO(); updateEntity.setFailedCnt(failedCnt + 1); /* 地址规则: 1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址) 2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址) 3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址) */ String taskName = taskOpt.get().getTaskName(); if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) { updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); } updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); updateEntity.setLastReportTime(reportTime); boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity); if (retryTask) { log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId); return; } } } } // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)(24.2.4 更新:大规模 MAP 任务追求极限性能,不持久化无用的子任务 result) result = result == null || ExecuteType.MAP.equals(executeType) ? "" : result; boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) { log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId); } } catch (InterruptedException ignore) { // ignore } catch (Exception e) { log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e); } finally { segmentLock.unlock(lockId); } } /** * 提交Task任务(MapReduce的Map,Broadcast的广播),上层保证 batchSize,同时插入过多数据可能导致失败 * * @param newTaskList 新增的子任务列表 */ public boolean submitTask(List newTaskList) { if (finished.get()) { return true; } if (CollectionUtils.isEmpty(newTaskList)) { return true; } // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!) newTaskList.forEach(task -> { task.setInstanceId(instanceId); task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); task.setFailedCnt(0); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); task.setLastReportTime(-1L); }); log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList); return taskPersistenceService.batchSave(newTaskList); } /** * 处理 ProcessorTracker 的心跳信息 * * @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态 */ public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) { log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq); ptStatusHolder.updateStatus(heartbeatReq); // 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务 if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) { String idlePtAddress = heartbeatReq.getAddress(); // 该 ProcessorTracker 已销毁,重置为初始状态 ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false); List unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); if (!CollectionUtils.isEmpty(unfinishedTask)) { log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); } } } /** * 生成广播任务 * * @param preExecuteSuccess 预执行广播任务运行状态 * @param subInstanceId 子实例ID * @param preTaskId 预执行广播任务的taskId * @param result 预执行广播任务的结果 */ public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) { if (finished.get()) { return; } log.info("[TaskTracker-{}-{}] finished broadcast's preProcess, preExecuteSuccess:{},preTaskId:{},result:{}", instanceId, subInstanceId, preExecuteSuccess, preTaskId, result); // 生成集群子任务 if (preExecuteSuccess) { List allWorkerAddress = ptStatusHolder.getAllProcessorTrackers(); List subTaskList = Lists.newLinkedList(); for (int i = 0; i < allWorkerAddress.size(); i++) { TaskDO subTask = new TaskDO(); subTask.setSubInstanceId(subInstanceId); subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME); subTask.setTaskId(preTaskId + "." + i); // 广播任务直接写入派发地址 subTask.setAddress(allWorkerAddress.get(i)); subTaskList.add(subTask); } submitTask(subTaskList); } else { log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result); } } /** * 销毁自身,释放资源 */ @Override public void destroy() { finished.set(true); Stopwatch sw = Stopwatch.createStarted(); // 0. 开始关闭线程池,不能使用 shutdownNow(),因为 destroy 方法本身就在 scheduledPool 的线程中执行,强行关闭会打断 destroy 的执行。 scheduledPool.shutdown(); // 1. 通知 ProcessorTracker 释放资源 TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); ptStatusHolder.getAllProcessorTrackers().forEach(ptAddress -> { // 不可靠通知,ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 TransportUtils.ttStopPtInstance(stopRequest, ptAddress, workerRuntime.getTransporter()); }); // 2. 删除所有数据库数据 boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); if (!dbSuccess) { log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId); } else { log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); } // 3. 移除顶层引用,送去 GC HeavyTaskTrackerManager.removeTaskTracker(instanceId); log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", instanceId, sw.stop()); // 4. 强制关闭线程池 if (!scheduledPool.isTerminated()) { CommonUtils.executeIgnoreException(() -> scheduledPool.shutdownNow()); } } @Override public void stopTask() { destroy(); } /* *************************** 对内方法区 *************************** */ /** * 派发任务到 ProcessorTracker * * @param task 需要被执行的任务 * @param processorTrackerAddress ProcessorTracker的地址(IP:Port) */ protected void dispatchTask(TaskDO task, String processorTrackerAddress) { // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) TaskDO updateEntity = new TaskDO(); updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); // 写入处理该任务的 ProcessorTracker updateEntity.setAddress(processorTrackerAddress); boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity); if (!success) { log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName()); return; } // 2. 更新 ProcessorTrackerStatus 状态 ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true); // 3. 初始化缓存 taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L)); // 4. 任务派发 TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress()); TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter()); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); } /** * 获取任务实例产生的各个Task状态,用于分析任务实例执行情况 * * @param subInstanceId 子任务实例ID * @return InstanceStatisticsHolder */ protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) { Map status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId); InstanceStatisticsHolder holder = new InstanceStatisticsHolder(); holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L); holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L); holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L); holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L); holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L); holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L); return holder; } /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ protected class Dispatcher extends SafeRunnable { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @Override public void run0() { if (finished.get()) { return; } Stopwatch stopwatch = Stopwatch.createStarted(); // 1. 获取可以派发任务的 ProcessorTracker List availablePtIps = ptStatusHolder.getAvailableProcessorTrackers(); // 2. 没有可用 ProcessorTracker,本次不派发 if (availablePtIps.isEmpty()) { log.warn("[TaskTracker-{}] no available ProcessorTracker now, skip dispatch", instanceId); return; } // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); List needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); currentDispatchNum += needDispatchTasks.size(); needDispatchTasks.forEach(task -> { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { if (taskNeedByPassTaskTracker(availablePtIps)) { do { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } while (workerRuntime.getWorkerAddress().equals(ptAddress)); } else { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } } dispatchTask(task, ptAddress); }); // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { break; } } log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } /** * padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。 * fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中, * 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。 */ private boolean taskNeedByPassTaskTracker(List availablePtIps) { if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) { if (availablePtIps.size() <= 1) { return false; } return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()); } return false; } } /** * 执行器动态上线(for 秒级任务和 MR 任务) * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 */ protected class WorkerDetector extends SafeRunnable { @Override public void run0() { boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker(); log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker); if (!needMoreWorker) { return; } final String currentServerAddress = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); if (StringUtils.isEmpty(currentServerAddress)) { log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); return; } try { WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId()); AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter()); if (!response.isSuccess()) { log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); return; } List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); ptStatusHolder.register(workerList); } catch (Exception e) { log.warn("[TaskTracker-{}] detective failed, currentServer: {}", instanceId, currentServerAddress, e); } } } @Data @AllArgsConstructor protected static class TaskBriefInfo { private String id; private TaskStatus status; private Long lastReportTime; } /** * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 */ @Data protected static class InstanceStatisticsHolder { // 等待派发状态(仅存在 TaskTracker 数据库中) protected long waitingDispatchNum; // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行 protected long workerUnreceivedNum; // ProcessorTracker确认接收,存在与线程池队列中,排队执行 protected long receivedNum; // ProcessorTracker正在执行 protected long runningNum; protected long failedNum; protected long succeedNum; public long getTotalTaskNum() { return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum; } } /** * 初始化 TaskTracker * * @param req 服务器调度任务实例运行请求 */ protected abstract void initTaskTracker(ServerScheduleJobReq req); }