package tech.powerjob.server.core.scheduler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Service; import tech.powerjob.common.enums.TimeExpressionType; import java.util.ArrayList; import java.util.List; /** * @author Echo009 * @since 2022/10/12 */ @Service @Slf4j @RequiredArgsConstructor public class CoreScheduleTaskManager implements InitializingBean, DisposableBean { private final PowerScheduleService powerScheduleService; private final InstanceStatusCheckService instanceStatusCheckService; private final List coreThreadContainer = new ArrayList<>(); @SuppressWarnings("AlibabaAvoidManuallyCreateThread") @Override public void afterPropertiesSet() { // 定时调度 coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow")); coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob")); // 数据清理 coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData")); // 状态检查 coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance")); coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance")); coreThreadContainer.forEach(Thread::start); } @Override public void destroy() { coreThreadContainer.forEach(Thread::interrupt); } @RequiredArgsConstructor private static class LoopRunnable implements Runnable { private final String taskName; private final Long runningInterval; private final Runnable innerRunnable; @SuppressWarnings("BusyWait") @Override public void run() { log.info("start task : {}.", taskName); while (true) { try { // 倒置顺序为 先 sleep 再执行,解决异常情况 while true 打日志的问题 https://github.com/PowerJob/PowerJob/issues/769 Thread.sleep(runningInterval); innerRunnable.run(); } catch (InterruptedException e) { log.warn("[{}] task has been interrupted!", taskName, e); break; } catch (Exception e) { log.error("[{}] task failed!", taskName, e); } } } } }