package tech.powerjob.server.common.timewheel.holder; import tech.powerjob.server.common.timewheel.HashedWheelTimer; import tech.powerjob.server.common.timewheel.Timer; import tech.powerjob.server.common.timewheel.TimerFuture; import tech.powerjob.server.common.timewheel.TimerTask; import com.google.common.collect.Maps; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 定时调度任务实例 * * @author tjq * @since 2020/7/25 */ public class InstanceTimeWheelService { private static final Map CARGO = Maps.newConcurrentMap(); /** * 精确调度时间轮,每 1MS 走一格 */ private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); /** * 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格 */ private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0); /** * 支持取消的时间间隔,低于该阈值则不会放进 CARGO */ private static final long MIN_INTERVAL_MS = 1000; /** * 长延迟阈值 */ private static final long LONG_DELAY_THRESHOLD_MS = 60000; /** * 定时调度 * @param uniqueId 唯一 ID,必须是 snowflake 算法生成的 ID * @param delayMS 延迟毫秒数 * @param timerTask 需要执行的目标方法 */ public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) { if (delayMS <= LONG_DELAY_THRESHOLD_MS) { realSchedule(uniqueId, delayMS, timerTask); return; } long expectTriggerTime = System.currentTimeMillis() + delayMS; TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> { CARGO.remove(uniqueId); realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask); }, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS); CARGO.put(uniqueId, longDelayTask); } /** * 获取 TimerFuture * @param uniqueId 唯一 ID * @return TimerFuture */ public static TimerFuture fetchTimerFuture(Long uniqueId) { return CARGO.get(uniqueId); } private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) { TimerFuture timerFuture = TIMER.schedule(() -> { CARGO.remove(uniqueId); timerTask.run(); }, delayMS, TimeUnit.MILLISECONDS); if (delayMS > MIN_INTERVAL_MS) { CARGO.put(uniqueId, timerFuture); } } }