package tech.powerjob.worker.core.ha; import tech.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import lombok.Data; import lombok.NoArgsConstructor; /** * ProcessorTracker 的状态 * * @author tjq * @since 2020/3/27 */ @Data @NoArgsConstructor public class ProcessorTrackerStatus { private static final int DISPATCH_THRESHOLD = 20; private static final int HEARTBEAT_TIMEOUT_MS = 60000; // 冗余存储一份 address 地址 private String address; // 上次活跃时间 private long lastActiveTime; // 等待执行任务数 private long remainTaskNum; // 是否被派发过任务 private boolean dispatched; // 是否接收到过来自 ProcessorTracker 的心跳 private boolean connected; /** * 初始化 ProcessorTracker,此时并未持有实际的 ProcessorTracker 状态 */ public void init(String address) { this.address = address; this.lastActiveTime = - 1; this.remainTaskNum = 0; this.dispatched = false; this.connected = false; } /** * 接收到 ProcessorTracker 的心跳信息后,更新状态 * @param req ProcessorTracker的心跳信息 */ public void update(ProcessorTrackerStatusReportReq req) { // 延迟到达的请求,直接忽略 if (req.getTime() <= lastActiveTime) { return; } this.address = req.getAddress(); this.lastActiveTime = req.getTime(); this.remainTaskNum = req.getRemainTaskNum(); this.dispatched = true; this.connected = true; } /** * 是否可用 */ public boolean available() { // 未曾派发过,默认可用 if (!dispatched) { return true; } // 已派发但未收到响应,则不可用 if (!connected) { return false; } // 长时间未收到心跳消息,则不可用 if (isTimeout()) { return false; } // 留有过多待处理任务,则不可用 if (remainTaskNum >= DISPATCH_THRESHOLD) { return false; } // TODO:后续考虑加上机器健康度等信息 return true; } /** * 是否超时(超过一定时间没有收到心跳) */ public boolean isTimeout() { if (dispatched) { return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS; } // 未曾派发过任务的机器,不用处理 return false; } }