package tech.powerjob.worker.core.tracker.processor; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable; import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager; import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.extension.processor.ProcessorDefinition; import tech.powerjob.worker.log.OmsLogger; import tech.powerjob.worker.log.OmsLoggerFactory; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import java.util.List; import java.util.Queue; import java.util.concurrent.*; /** * 负责管理 Processor 的执行 * * @author tjq * @since 2020/3/20 */ @Slf4j public class ProcessorTracker { /** * 记录创建时间 */ private long startTime; private WorkerRuntime workerRuntime; /** * 任务实例信息 */ private InstanceInfo instanceInfo; /** * 冗余 instanceId,方便日志 */ private Long instanceId; private ProcessorBean processorBean; /** * 在线日志 */ private OmsLogger omsLogger; /** * ProcessResult 上报失败的重试队列 */ private Queue statusReportRetryQueue; /** * 上一次空闲时间(用于闲置判定) */ private long lastIdleTime; /** * 上次完成任务数量(用于闲置判定) */ private long lastCompletedTaskCount; private String taskTrackerAddress; private ThreadPoolExecutor threadPool; private ScheduledExecutorService timingPool; private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; /** * 长时间空闲的 ProcessorTracker 会发起销毁请求 */ private static final long MAX_IDLE_TIME = 120000; /** * 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败) */ private boolean lethal = false; private String lethalReason; /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ @SuppressWarnings("squid:S1181") public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) { try { // 赋值 this.startTime = System.currentTimeMillis(); this.workerRuntime = workerRuntime; this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; this.lastCompletedTaskCount = 0L; // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); // 初始化定时任务 initTimingJob(); // 初始化 Processor processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo())); log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId); } catch (Throwable t) { log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t); lethal = true; lethalReason = ExceptionUtils.getMessage(t); } } /** * 提交任务到线程池执行 * 1.0版本:TaskTracker有任务就dispatch,导致 ProcessorTracker 本地可能堆积过多的任务,造成内存压力。为此 ProcessorTracker 在线程 * 池队列堆积到一定程度时,会将数据持久化到DB,然后通过异步线程定时从数据库中取回任务,重新提交执行。 * 联动:数据库的SPID设计、TaskStatus段落设计等,全部取消... * last commitId: 341953aceceafec0fbe7c3d9a3e26451656b945e * 2.0版本:ProcessorTracker定时向TaskTracker发送心跳消息,心跳消息中包含了当前线程池队列任务个数,TaskTracker根据ProcessorTracker * 的状态判断能否继续派发任务。因此,ProcessorTracker本地不会堆积过多任务,故删除 持久化机制 ╥﹏╥...! * @param newTask 需要提交到线程池执行的任务 */ public void submitTask(TaskDO newTask) { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq() .setInstanceId(instanceId) .setSubInstanceId(newTask.getSubInstanceId()) .setTaskId(newTask.getTaskId()) .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()) .setResult(lethalReason) .setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime); return; } boolean success = false; // 1. 设置值并提交执行 newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime); try { threadPool.submit(heavyProcessorRunnable); success = true; } catch (RejectedExecutionException ignore) { log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", instanceId, newTask.getTaskId(), newTask.getTaskName()); } catch (Exception e) { log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e); } // 2. 回复接收成功 if (success) { ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); reportReq.setInstanceId(instanceId); reportReq.setSubInstanceId(newTask.getSubInstanceId()); reportReq.setTaskId(newTask.getTaskId()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); } } /** * 释放资源 */ public void destroy() { // 1. 关闭执行执行线程池 CommonUtils.executeIgnoreException(() -> { List tasks = threadPool.shutdownNow(); if (!CollectionUtils.isEmpty(tasks)) { log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", instanceId, tasks.size()); } }); // 2. 去除顶层引用,送入GC世界 statusReportRetryQueue.clear(); ProcessorTrackerManager.removeProcessorTracker(instanceId); log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", instanceId); // 3. 关闭定时线程池 CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow()); } /** * 初始化线程池 */ private void initThreadPool() { int poolSize = calThreadPoolSize(); // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大 BlockingQueue queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE); // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP) ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build(); // 拒绝策略:直接抛出异常 RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy(); threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler); // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0) threadPool.allowCoreThreadTimeOut(true); } /** * 初始化定时任务 */ private void initTimingJob() { // PowerJob Processor TimingPool ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build(); timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory); timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS); } /** * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ private class CheckerAndReporter extends SafeRunnable { @Override @SuppressWarnings({"squid:S1066","squid:S3776"}) public void run0() { // 超时检查,如果超时则自动关闭 TaskTracker long interval = System.currentTimeMillis() - startTime; // 秒级任务的ProcessorTracker不应该关闭 if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) { if (interval > instanceInfo.getInstanceTimeoutMS()) { log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId); destroy(); return; } } // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查 if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; lastCompletedTaskCount = threadPool.getCompletedTaskCount(); } else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); } else { long idleTime = System.currentTimeMillis() - lastIdleTime; if (idleTime > MAX_IDLE_TIME) { log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime); // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受 ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); statusReportReq.setAddress(workerRuntime.getWorkerAddress()); TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); destroy(); return; } } } // 上报状态之前,先重新发送失败的任务,只要有结果堆积,就不上报状态(让 PT 认为该 TT 失联然后重试相关任务) while (!statusReportRetryQueue.isEmpty()) { ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll(); if (req != null) { req.setReportTime(System.currentTimeMillis()); if (!TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime)) { statusReportRetryQueue.add(req); log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req); return; } } } // 上报当前 ProcessorTracker 负载 long waitingNum = threadPool.getQueue().size(); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum); statusReportReq.setAddress(workerRuntime.getWorkerAddress()); TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum); } } /** * 计算线程池大小 */ private int calThreadPoolSize() { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType()); // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程 if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) { return 1; } if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) { return instanceInfo.getThreadConcurrency(); } if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) { return instanceInfo.getThreadConcurrency(); } return 2; } }