WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package tech.powerjob.worker.core.tracker.processor;
 
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.CollectionUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.enhance.SafeRunnable;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
import tech.powerjob.worker.common.utils.TransportUtils;
import tech.powerjob.worker.core.processor.runnable.HeavyProcessorRunnable;
import tech.powerjob.worker.core.tracker.manager.ProcessorTrackerManager;
import tech.powerjob.worker.extension.processor.ProcessorBean;
import tech.powerjob.worker.extension.processor.ProcessorDefinition;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
 
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
 
/**
 * 负责管理 Processor 的执行
 *
 * @author tjq
 * @since 2020/3/20
 */
@Slf4j
public class ProcessorTracker {
 
    /**
     * 记录创建时间
     */
    private long startTime;
    private WorkerRuntime workerRuntime;
    /**
     * 任务实例信息
     */
    private InstanceInfo instanceInfo;
    /**
     * 冗余 instanceId,方便日志
     */
    private Long instanceId;
 
    private ProcessorBean processorBean;
    /**
     * 在线日志
     */
    private OmsLogger omsLogger;
    /**
     * ProcessResult 上报失败的重试队列
     */
    private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    /**
     * 上一次空闲时间(用于闲置判定)
     */
    private long lastIdleTime;
    /**
     * 上次完成任务数量(用于闲置判定)
     */
    private long lastCompletedTaskCount;
 
    private String taskTrackerAddress;
 
    private ThreadPoolExecutor threadPool;
 
    private ScheduledExecutorService timingPool;
 
    private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
    /**
     * 长时间空闲的 ProcessorTracker 会发起销毁请求
     */
    private static final long MAX_IDLE_TIME = 120000;
    /**
     * 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
     */
    private boolean lethal = false;
 
    private String lethalReason;
 
    /**
     * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T)
     */
    @SuppressWarnings("squid:S1181")
    public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {
        try {
            // 赋值
            this.startTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceInfo = request.getInstanceInfo();
            this.instanceId = request.getInstanceInfo().getInstanceId();
            this.taskTrackerAddress = request.getTaskTrackerAddress();
 
            this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
            this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
            this.lastIdleTime = -1L;
            this.lastCompletedTaskCount = 0L;
 
            // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
            initThreadPool();
            // 初始化定时任务
            initTimingJob();
            // 初始化 Processor
            processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(instanceInfo.getProcessorType()).setProcessorInfo(instanceInfo.getProcessorInfo()));
            log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
        } catch (Throwable t) {
            log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);
            lethal = true;
            lethalReason = ExceptionUtils.getMessage(t);
        }
    }
 
    /**
     * 提交任务到线程池执行
     * 1.0版本:TaskTracker有任务就dispatch,导致 ProcessorTracker 本地可能堆积过多的任务,造成内存压力。为此 ProcessorTracker 在线程
     *         池队列堆积到一定程度时,会将数据持久化到DB,然后通过异步线程定时从数据库中取回任务,重新提交执行。
     *         联动:数据库的SPID设计、TaskStatus段落设计等,全部取消...
     *         last commitId: 341953aceceafec0fbe7c3d9a3e26451656b945e
     * 2.0版本:ProcessorTracker定时向TaskTracker发送心跳消息,心跳消息中包含了当前线程池队列任务个数,TaskTracker根据ProcessorTracker
     *         的状态判断能否继续派发任务。因此,ProcessorTracker本地不会堆积过多任务,故删除 持久化机制 ╥﹏╥...!
     * @param newTask 需要提交到线程池执行的任务
     */
    public void submitTask(TaskDO newTask) {
 
        // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁
        // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T
        if (lethal) {
            ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
                    .setInstanceId(instanceId)
                    .setSubInstanceId(newTask.getSubInstanceId())
                    .setTaskId(newTask.getTaskId())
                    .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
                    .setResult(lethalReason)
                    .setReportTime(System.currentTimeMillis());
 
            TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime);
            return;
        }
 
        boolean success = false;
        // 1. 设置值并提交执行
        newTask.setInstanceId(instanceInfo.getInstanceId());
        newTask.setAddress(taskTrackerAddress);
 
        HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime);
        try {
            threadPool.submit(heavyProcessorRunnable);
            success = true;
        } catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName());
        } catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
        }
 
        // 2. 回复接收成功
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());
 
            TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime);
 
            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.",
                    instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size());
        }
    }
 
    /**
     * 释放资源
     */
    public void destroy() {
 
        // 1. 关闭执行执行线程池
        CommonUtils.executeIgnoreException(() -> {
            List<Runnable> tasks = threadPool.shutdownNow();
            if (!CollectionUtils.isEmpty(tasks)) {
                log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", instanceId, tasks.size());
            }
        });
 
        // 2. 去除顶层引用,送入GC世界
        statusReportRetryQueue.clear();
        ProcessorTrackerManager.removeProcessorTracker(instanceId);
 
        log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", instanceId);
 
        // 3. 关闭定时线程池
        CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow());
    }
 
 
    /**
     * 初始化线程池
     */
    private void initThreadPool() {
 
        int poolSize = calThreadPoolSize();
        // 待执行队列,为了防止对内存造成较大压力,内存队列不能太大
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(THREAD_POOL_QUEUE_MAX_SIZE);
        // 自定义线程池中线程名称 (PowerJob Processor Pool -> PPP)
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
        // 拒绝策略:直接抛出异常
        RejectedExecutionHandler rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
 
        threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
 
        // 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)
        threadPool.allowCoreThreadTimeOut(true);
    }
 
    /**
     * 初始化定时任务
     */
    private void initTimingJob() {
 
        // PowerJob Processor TimingPool
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
        timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
 
        timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0, 10, TimeUnit.SECONDS);
    }
 
 
    /**
     * 定时向 TaskTracker 汇报(携带任务执行信息的心跳)
     */
    private class CheckerAndReporter extends SafeRunnable {
 
        @Override
        @SuppressWarnings({"squid:S1066","squid:S3776"})
        public void run0() {
 
            // 超时检查,如果超时则自动关闭 TaskTracker
            long interval = System.currentTimeMillis() - startTime;
            // 秒级任务的ProcessorTracker不应该关闭
            if (!TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
                if (interval > instanceInfo.getInstanceTimeoutMS()) {
                    log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", instanceId);
                    destroy();
                    return;
                }
            }
 
            // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
            if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
                lastIdleTime = -1;
                lastCompletedTaskCount = threadPool.getCompletedTaskCount();
            } else {
                if (lastIdleTime == -1) {
                    lastIdleTime = System.currentTimeMillis();
                } else {
                    long idleTime = System.currentTimeMillis() - lastIdleTime;
                    if (idleTime > MAX_IDLE_TIME) {
                        log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
 
                        // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受
                        ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
                        statusReportReq.setAddress(workerRuntime.getWorkerAddress());
                        TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
                        destroy();
                        return;
                    }
                }
            }
 
            // 上报状态之前,先重新发送失败的任务,只要有结果堆积,就不上报状态(让 PT 认为该 TT 失联然后重试相关任务)
            while (!statusReportRetryQueue.isEmpty()) {
                ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
                if (req != null) {
                    req.setReportTime(System.currentTimeMillis());
                    if (!TransportUtils.reliablePtReportTask(req, taskTrackerAddress, workerRuntime)) {
                        statusReportRetryQueue.add(req);
                        log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", instanceId, req);
                        return;
                    }
                }
            }
 
            // 上报当前 ProcessorTracker 负载
            long waitingNum = threadPool.getQueue().size();
            ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum);
            statusReportReq.setAddress(workerRuntime.getWorkerAddress());
            TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime);
            log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
        }
 
    }
 
 
    /**
     * 计算线程池大小
     */
    private int calThreadPoolSize() {
        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
        ProcessorType processorType = ProcessorType.valueOf(instanceInfo.getProcessorType());
 
        // 脚本类自带线程池,不过为了少一点逻辑判断,还是象征性分配一个线程
        if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
            return 1;
        }
 
        if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
            return instanceInfo.getThreadConcurrency();
        }
        if (TimeExpressionType.FREQUENT_TYPES.contains(instanceInfo.getTimeExpressionType())) {
            return instanceInfo.getThreadConcurrency();
        }
        return 2;
    }
 
}