package tech.powerjob.worker.core.ha; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Map; /** * 统一管理 ProcessorTracker 的状态 * * @author tjq * @since 2020/3/28 */ @Slf4j public class ProcessorTrackerStatusHolder { private final Long instanceId; private final Integer maxWorkerCount; // ProcessorTracker的address(IP:Port) -> 状态 private final Map address2Status; public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List allWorkerAddress) { this.instanceId = instanceId; this.maxWorkerCount = maxWorkerCount; address2Status = Maps.newConcurrentMap(); allWorkerAddress.forEach(address -> { ProcessorTrackerStatus pts = new ProcessorTrackerStatus(); pts.init(address); address2Status.put(address, pts); }); } /** * 根据地址获取 ProcessorTracker 的状态 * @param address IP:Port * @return status */ public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 return address2Status.computeIfAbsent(address, ignore -> { log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address); ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); processorTrackerStatus.init(address); return processorTrackerStatus; }); } /** * 根据 ProcessorTracker 的心跳更新状态 */ public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) { getProcessorTrackerStatus(heartbeatReq.getAddress()).update(heartbeatReq); } /** * 获取可用 ProcessorTracker 的IP地址 */ public List getAvailableProcessorTrackers() { List result = Lists.newLinkedList(); address2Status.forEach((address, ptStatus) -> { if (ptStatus.available()) { result.add(address); } }); return result; } /** * 获取所有 ProcessorTracker 的IP地址(包括不可用状态) */ public List getAllProcessorTrackers() { return Lists.newArrayList(address2Status.keySet()); } /** * 获取所有失联 ProcessorTracker 的IP地址 */ public List getAllDisconnectedProcessorTrackers() { List result = Lists.newLinkedList(); address2Status.forEach((ip, ptStatus) -> { if (ptStatus.isTimeout()) { result.add(ip); } }); return result; } /** * 注册新的执行节点 * @param address 新的执行节点地址 * @return true: register successfully / false: already exists */ private boolean registerOne(String address) { ProcessorTrackerStatus pts = address2Status.get(address); if (pts != null) { return false; } pts = new ProcessorTrackerStatus(); pts.init(address); address2Status.put(address, pts); log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address); return true; } public void register(List workerIpList) { if (endlessWorkerNum()) { workerIpList.forEach(this::registerOne); return; } List availableProcessorTrackers = getAvailableProcessorTrackers(); int currentWorkerSize = availableProcessorTrackers.size(); int needMoreNum = maxWorkerCount - currentWorkerSize; if (needMoreNum <= 0) { return; } log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum); for (String newIp : workerIpList) { boolean success = registerOne(newIp); if (success) { needMoreNum --; } if (needMoreNum <= 0) { return; } } } /** * 检查是否需要动态加载新的执行器 * @return check need more workers */ public boolean checkNeedMoreWorker() { if (endlessWorkerNum()) { return true; } return getAvailableProcessorTrackers().size() < maxWorkerCount; } private boolean endlessWorkerNum() { return maxWorkerCount == null || maxWorkerCount == 0; } public void remove(List addressList) { addressList.forEach(address2Status::remove); } }