WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package tech.powerjob.server.core.instance;
 
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.request.ServerStopInstanceReq;
import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.alarm.AlarmUtils;
import tech.powerjob.server.core.service.UserService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.core.alarm.AlarmCenter;
import tech.powerjob.server.core.alarm.module.JobInstanceAlarm;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.aware.TransportServiceAware;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
 
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
 
/**
 * 管理被调度的任务实例(状态更新相关)
 *
 * @author tjq
 * @since 2020/4/7
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceManager implements TransportServiceAware {
 
    private final AlarmCenter alarmCenter;
 
    private final InstanceLogService instanceLogService;
 
    private final InstanceMetadataService instanceMetadataService;
 
    private final InstanceInfoRepository instanceInfoRepository;
 
    private final WorkflowInstanceManager workflowInstanceManager;
 
    private final WorkerClusterQueryService workerClusterQueryService;
 
    /**
     * 基础组件通过 aware 注入,避免循环依赖
     */
    private TransportService transportService;
 
    /**
     * 更新任务状态
     * ********************************************
     * 2021-02-03 modify by Echo009
     * 实例的执行次数统一在这里管理,对于非固定频率的任务
     * 当 db 中实例的状态为等待派发时,runningTimes + 1
     * ********************************************
     *
     * @param req TaskTracker上报任务实例状态的请求
     */
    public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
 
        Long instanceId = req.getInstanceId();
        // 获取相关数据
        JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());
        InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
        if (instanceInfo == null) {
            log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId);
            return;
        }
 
        // 考虑极端情况:Processor 处理耗时小于 server 写 DB 耗时,会导致状态上报时无 taskTracker 地址,此处等待后重新从DB获取数据 GitHub#620
        if (StringUtils.isEmpty(instanceInfo.getTaskTrackerAddress())) {
            log.warn("[InstanceManager-{}] TaskTrackerAddress is empty, server will wait then acquire again!", instanceId);
            CommonUtils.easySleep(277);
            instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
        }
 
        int originStatus = instanceInfo.getStatus();
        // 丢弃过期的上报数据
        if (req.getReportTime() <= instanceInfo.getLastReportTime()) {
            log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req);
            return;
        }
        // 丢弃非目标 TaskTracker 的上报数据(脑裂情况)
        if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {
            log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will be dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
            return;
        }
 
        InstanceStatus receivedInstanceStatus = InstanceStatus.of(req.getInstanceStatus());
        Integer timeExpressionType = jobInfo.getTimeExpressionType();
        // 更新 最后上报时间 和 修改时间
        instanceInfo.setLastReportTime(req.getReportTime());
        instanceInfo.setGmtModified(new Date());
 
        // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可
        // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
        // 综上,直接把 status 和 runningNum 同步到DB即可
        if (TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType)) {
            // 如果实例处于失败状态,则说明该 worker 失联了一段时间,被 server 判定为宕机,而此时该秒级任务有可能已经重新派发了,故需要 Kill 掉该实例
            // fix issue 375
            if (instanceInfo.getStatus() == InstanceStatus.FAILED.getV()) {
                log.warn("[InstanceManager-{}] receive TaskTracker's report: {}, but current instance is already failed, this instance should be killed.", instanceId, req);
                stopInstance(instanceId, instanceInfo);
                return;
            }
            LifeCycle lifeCycle = LifeCycle.parse(jobInfo.getLifecycle());
            // 检查生命周期是否已结束
            if (lifeCycle.getEnd() != null && lifeCycle.getEnd() <= System.currentTimeMillis()) {
                stopInstance(instanceId, instanceInfo);
                instanceInfo.setStatus(InstanceStatus.SUCCEED.getV());
            } else {
                instanceInfo.setStatus(receivedInstanceStatus.getV());
            }
            instanceInfo.setResult(req.getResult());
            instanceInfo.setRunningTimes(req.getTotalTaskNum());
            instanceInfoRepository.saveAndFlush(instanceInfo);
            // 任务需要告警
            if (req.isNeedAlert()) {
                log.info("[InstanceManager-{}] receive frequent task alert req,time:{},content:{}", instanceId, req.getReportTime(), req.getAlertContent());
                alert(instanceId, req.getAlertContent());
            }
            return;
        }
        // 更新运行次数
        if (instanceInfo.getStatus() == InstanceStatus.WAITING_WORKER_RECEIVE.getV()) {
            // 这里不会存在并发问题
            instanceInfo.setRunningTimes(instanceInfo.getRunningTimes() + 1);
        }
        // QAQ ,不能提前变更 status,否则会导致更新运行次数的逻辑不生效继而导致普通任务 无限重试
        instanceInfo.setStatus(receivedInstanceStatus.getV());
 
        boolean finished = false;
        if (receivedInstanceStatus == InstanceStatus.SUCCEED) {
            instanceInfo.setResult(req.getResult());
            instanceInfo.setFinishedTime(req.getEndTime() == null ? System.currentTimeMillis() : req.getEndTime());
            finished = true;
        } else if (receivedInstanceStatus == InstanceStatus.FAILED) {
 
            // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =)
            if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {
 
                log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
 
                // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败)
                instanceInfo.setExpectedTriggerTime(System.currentTimeMillis() + 10000);
 
                // 修改状态为 等待派发,正式开始重试
                // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖)
                instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
            } else {
                instanceInfo.setResult(req.getResult());
                instanceInfo.setFinishedTime(req.getEndTime() == null ? System.currentTimeMillis() : req.getEndTime());
                finished = true;
                log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId);
            }
        }
        if (finished) {
            // 最终状态允许直接覆盖更新
            instanceInfoRepository.saveAndFlush(instanceInfo);
            // 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报
            processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());
            return;
        }
        // 带条件更新
        final int i = instanceInfoRepository.updateStatusChangeInfoByInstanceIdAndStatus(instanceInfo.getLastReportTime(), instanceInfo.getGmtModified(), instanceInfo.getRunningTimes(), instanceInfo.getStatus(), instanceInfo.getInstanceId(), originStatus);
        if (i == 0) {
            log.warn("[InstanceManager-{}] update instance status failed, maybe the instance status has been changed by other thread. discard this status change,{}", instanceId, instanceInfo);
        }
    }
 
    private void stopInstance(Long instanceId, InstanceInfoDO instanceInfo) {
        Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
        if (workerInfoOpt.isPresent()) {
            ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId);
            WorkerInfo workerInfo = workerInfoOpt.get();
            final URL url = ServerURLFactory.stopInstance2Worker(workerInfo.getAddress());
            transportService.tell(workerInfo.getProtocol(), url, stopInstanceReq);
        }
    }
 
    /**
     * 收尾完成的任务实例
     *
     * @param instanceId   任务实例ID
     * @param wfInstanceId 工作流实例ID,非必须
     * @param status       任务状态,有 成功/失败/手动停止
     * @param result       执行结果
     */
    public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {
 
        log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
 
        // 上报日志数据
        HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS);
 
        // workflow 特殊处理
        if (wfInstanceId != null) {
            // 手动停止在工作流中也认为是失败(理论上不应该发生)
            workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
        }
 
        // 告警
        if (status == InstanceStatus.FAILED) {
            alert(instanceId, result);
        }
        // 主动移除缓存,减小内存占用
        instanceMetadataService.invalidateJobInfo(instanceId);
    }
 
    private void alert(Long instanceId, String alertContent) {
        InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
        JobInfoDO jobInfo;
        try {
            jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(instanceId);
        } catch (Exception e) {
            log.warn("[InstanceManager-{}] can't find jobInfo, alarm failed.", instanceId);
            return;
        }
        JobInstanceAlarm content = new JobInstanceAlarm();
        BeanUtils.copyProperties(jobInfo, content);
        BeanUtils.copyProperties(instanceInfo, content);
        List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
        if (!StringUtils.isEmpty(alertContent)) {
            content.setResult(alertContent);
        }
        alarmCenter.alarmFailed(content, AlarmUtils.convertUserInfoList2AlarmTargetList(userList));
    }
 
    @Override
    public void setTransportService(TransportService transportService) {
        this.transportService = transportService;
    }
}