package tech.powerjob.server.common.timewheel; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.RejectedExecutionHandlerFactory; import com.google.common.collect.Queues; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; import java.util.Queue; import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** * 时间轮定时器 * 支持的最小精度:1ms(Thread.sleep本身不精确导致精度没法提高) * 最小误差:1ms,理由同上 * * @author tjq * @since 2020/4/2 */ @Slf4j public class HashedWheelTimer implements Timer { private final long tickDuration; private final HashedWheelBucket[] wheel; private final int mask; private final Indicator indicator; private final long startTime; private final Queue waitingTasks = Queues.newLinkedBlockingQueue(); private final Queue canceledTasks = Queues.newLinkedBlockingQueue(); private final ExecutorService taskProcessPool; public HashedWheelTimer(long tickDuration, int ticksPerWheel) { this(tickDuration, ticksPerWheel, 0); } /** * 新建时间轮定时器 * @param tickDuration 时间间隔,单位毫秒(ms) * @param ticksPerWheel 轮盘个数 * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) */ public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) { this.tickDuration = tickDuration; // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余 int ticksNum = CommonUtils.formatSize(ticksPerWheel); wheel = new HashedWheelBucket[ticksNum]; for (int i = 0; i < ticksNum; i++) { wheel[i] = new HashedWheelBucket(); } mask = wheel.length - 1; // 初始化执行线程池 if (processThreadNum <= 0) { taskProcessPool = null; }else { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); // 这里需要调整一下队列大小 BlockingQueue queue = Queues.newLinkedBlockingQueue(8192); int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); // 基本都是 io 密集型任务 taskProcessPool = new ThreadPoolExecutor(core, 2 * core, 60, TimeUnit.SECONDS, queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool")); } startTime = System.currentTimeMillis(); // 启动后台线程 indicator = new Indicator(); new Thread(indicator, "HashedWheelTimer-Indicator").start(); } @Override public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) { long targetTime = System.currentTimeMillis() + unit.toMillis(delay); HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime); // 直接运行到期、过期任务 if (delay <= 0) { runTask(timerFuture); return timerFuture; } // 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列) waitingTasks.add(timerFuture); return timerFuture; } @Override public Set stop() { indicator.stop.set(true); taskProcessPool.shutdown(); while (!taskProcessPool.isTerminated()) { try { Thread.sleep(100); }catch (Exception ignore) { } } return indicator.getUnprocessedTasks(); } /** * 包装 TimerTask,维护预期执行时间、总圈数等数据 */ private final class HashedWheelTimerFuture implements TimerFuture { // 预期执行时间 private final long targetTime; private final TimerTask timerTask; // 所属的时间格,用于快速删除该任务 private HashedWheelBucket bucket; // 总圈数 private long totalTicks; // 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消 private int status; // 状态枚举值 private static final int WAITING = 0; private static final int RUNNING = 1; private static final int FINISHED = 2; private static final int CANCELED = 3; public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) { this.targetTime = targetTime; this.timerTask = timerTask; this.status = WAITING; } @Override public TimerTask getTask() { return timerTask; } @Override public boolean cancel() { if (status == WAITING) { status = CANCELED; canceledTasks.add(this); return true; } return false; } @Override public boolean isCancelled() { return status == CANCELED; } @Override public boolean isDone() { return status == FINISHED; } } /** * 时间格(本质就是链表,维护了这个时刻可能需要执行的所有任务) */ private final class HashedWheelBucket extends LinkedList { public void expireTimerTasks(long currentTick) { removeIf(timerFuture -> { // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况 if (timerFuture.status == HashedWheelTimerFuture.CANCELED) { return true; } if (timerFuture.status != HashedWheelTimerFuture.WAITING) { log.warn("[HashedWheelTimer] impossible, please fix the bug"); return true; } // 本轮直接调度 if (timerFuture.totalTicks <= currentTick) { if (timerFuture.totalTicks < currentTick) { log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug"); } try { // 提交执行 runTask(timerFuture); }catch (Exception ignore) { } finally { timerFuture.status = HashedWheelTimerFuture.FINISHED; } return true; } return false; }); } } private void runTask(HashedWheelTimerFuture timerFuture) { timerFuture.status = HashedWheelTimerFuture.RUNNING; if (taskProcessPool == null) { timerFuture.timerTask.run(); }else { taskProcessPool.submit(timerFuture.timerTask); } } /** * 模拟指针转动 */ private class Indicator implements Runnable { private long tick = 0; private final AtomicBoolean stop = new AtomicBoolean(false); private final CountDownLatch latch = new CountDownLatch(1); @Override public void run() { while (!stop.get()) { // 1. 将任务从队列推入时间轮 pushTaskToBucket(); // 2. 处理取消的任务 processCanceledTasks(); // 3. 等待指针跳向下一刻 tickTack(); // 4. 执行定时任务 int currentIndex = (int) (tick & mask); HashedWheelBucket bucket = wheel[currentIndex]; bucket.expireTimerTasks(tick); tick ++; } latch.countDown(); } /** * 模拟指针转动,当返回时指针已经转到了下一个刻度 */ private void tickTack() { // 下一次调度的绝对时间 long nextTime = startTime + (tick + 1) * tickDuration; long sleepTime = nextTime - System.currentTimeMillis(); if (sleepTime > 0) { try { Thread.sleep(sleepTime); }catch (Exception ignore) { } } } /** * 处理被取消的任务 */ private void processCanceledTasks() { while (true) { HashedWheelTimerFuture canceledTask = canceledTasks.poll(); if (canceledTask == null) { return; } // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理) if (canceledTask.bucket != null) { canceledTask.bucket.remove(canceledTask); } } } /** * 将队列中的任务推入时间轮中 */ private void pushTaskToBucket() { while (true) { HashedWheelTimerFuture timerTask = waitingTasks.poll(); if (timerTask == null) { return; } // 总共的偏移量 long offset = timerTask.targetTime - startTime; // 总共需要走的指针步数 timerTask.totalTicks = offset / tickDuration; // 取余计算 bucket index int index = (int) (timerTask.totalTicks & mask); HashedWheelBucket bucket = wheel[index]; // TimerTask 维护 Bucket 引用,用于删除该任务 timerTask.bucket = bucket; if (timerTask.status == HashedWheelTimerFuture.WAITING) { bucket.add(timerTask); } } } public Set getUnprocessedTasks() { try { latch.await(); }catch (Exception ignore) { } Set tasks = Sets.newHashSet(); Consumer consumer = timerFuture -> { if (timerFuture.status == HashedWheelTimerFuture.WAITING) { tasks.add(timerFuture.timerTask); } }; waitingTasks.forEach(consumer); for (HashedWheelBucket bucket : wheel) { bucket.forEach(consumer); } return tasks; } } }