WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package tech.powerjob.worker.core.ha;
 
import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
 
import java.util.List;
import java.util.Map;
 
/**
 * 统一管理 ProcessorTracker 的状态
 *
 * @author tjq
 * @since 2020/3/28
 */
@Slf4j
public class ProcessorTrackerStatusHolder {
 
    private final Long instanceId;
    private final Integer maxWorkerCount;
    // ProcessorTracker的address(IP:Port) -> 状态
    private final Map<String, ProcessorTrackerStatus> address2Status;
 
    public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List<String> allWorkerAddress) {
 
        this.instanceId = instanceId;
        this.maxWorkerCount = maxWorkerCount;
 
        address2Status = Maps.newConcurrentMap();
        allWorkerAddress.forEach(address -> {
            ProcessorTrackerStatus pts = new ProcessorTrackerStatus();
            pts.init(address);
            address2Status.put(address, pts);
        });
    }
 
    /**
     * 根据地址获取 ProcessorTracker 的状态
     * @param address IP:Port
     * @return status
     */
    public ProcessorTrackerStatus getProcessorTrackerStatus(String address) {
        // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率
        return address2Status.computeIfAbsent(address, ignore -> {
            log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address);
            ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus();
            processorTrackerStatus.init(address);
            return processorTrackerStatus;
        });
    }
 
    /**
     * 根据 ProcessorTracker 的心跳更新状态
     */
    public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) {
        getProcessorTrackerStatus(heartbeatReq.getAddress()).update(heartbeatReq);
    }
 
    /**
     * 获取可用 ProcessorTracker 的IP地址
     */
    public List<String> getAvailableProcessorTrackers() {
 
        List<String> result = Lists.newLinkedList();
        address2Status.forEach((address, ptStatus) -> {
            if (ptStatus.available()) {
                result.add(address);
            }
        });
        return result;
    }
 
    /**
     * 获取所有 ProcessorTracker 的IP地址(包括不可用状态)
     */
    public List<String> getAllProcessorTrackers() {
        return Lists.newArrayList(address2Status.keySet());
    }
 
    /**
     * 获取所有失联 ProcessorTracker 的IP地址
     */
    public List<String> getAllDisconnectedProcessorTrackers() {
 
        List<String> result = Lists.newLinkedList();
        address2Status.forEach((ip, ptStatus) -> {
            if (ptStatus.isTimeout()) {
                result.add(ip);
            }
        });
        return result;
    }
 
    /**
     * 注册新的执行节点
     * @param address 新的执行节点地址
     * @return true: register successfully / false: already exists
     */
    private boolean registerOne(String address) {
        ProcessorTrackerStatus pts = address2Status.get(address);
        if (pts != null) {
            return false;
        }
        pts = new ProcessorTrackerStatus();
        pts.init(address);
        address2Status.put(address, pts);
        log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address);
        return true;
    }
 
    public void register(List<String> workerIpList) {
        if (endlessWorkerNum()) {
            workerIpList.forEach(this::registerOne);
            return;
        }
        List<String> availableProcessorTrackers = getAvailableProcessorTrackers();
        int currentWorkerSize = availableProcessorTrackers.size();
        int needMoreNum = maxWorkerCount - currentWorkerSize;
        if (needMoreNum <= 0) {
            return;
        }
 
        log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum);
 
        for (String newIp : workerIpList) {
            boolean success = registerOne(newIp);
            if (success) {
                needMoreNum --;
            }
            if (needMoreNum <= 0) {
                return;
            }
        }
    }
 
    /**
     * 检查是否需要动态加载新的执行器
     * @return check need more workers
     */
    public boolean checkNeedMoreWorker() {
        if (endlessWorkerNum()) {
            return true;
        }
        return getAvailableProcessorTrackers().size() < maxWorkerCount;
    }
 
    private boolean endlessWorkerNum() {
        return maxWorkerCount == null || maxWorkerCount == 0;
    }
 
    public void remove(List<String> addressList) {
        addressList.forEach(address2Status::remove);
    }
}