package com.iplatform.gather; import com.walker.infrastructure.utils.StringUtils; import com.walker.scheduler.impl.TimedScheduler; import com.walker.scheduler.util.OptionUtils; import com.walker.store.AbstractStore; import com.walker.store.task.GatherTask; /** * 抽象的采集调度器默认实现,提供了创建调度的模板。 * 所有采集数据任务都应继承该模板使用。 * @author 时克英 * @date 2022-09-09 */ public abstract class GatherScheduler extends TimedScheduler { private AbstractStore store; private GatherTask gatherTask; // 检测采集存储的目标表是否存在,2022-09-12 private boolean checkDestTableExist = true; public GatherScheduler(int id, String name, AbstractStore store){ super(id, name); // if(id <= 0){ // throw new IllegalArgumentException("GatherScheduler: id is required!"); // } // if(StringUtils.isEmpty(name)){ // throw new IllegalArgumentException("GatherScheduler: name is required!"); // } if(store == null){ throw new IllegalArgumentException("GatherScheduler: store is required!"); } this.store = store; // 设置时间选项为永远,同:ForeverScheduler this.setOption(OptionUtils.combineEveryDay24HourOption()); } @Override public void stop() { super.stop(); this.store.destroy(); } @Override protected Object onProcess(Object[] inputParams) throws Exception { if(this.gatherTask == null){ this.gatherTask = this.providerTask(this.store); if(this.gatherTask == null){ throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName()); } } // srcName, createTableSQL, parameter, params[] // 1因为无法更新私服,因此这个抛出异常先不粗粝,注释掉,回单位要加上 return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]); } @Override public String getStoreId(){ return this.store.getId(); } protected AbstractStore getStore(){ return this.store; } /** * 获取一个特定的采集任务,采集任务主要处理数据获取的具体操作。 * @param store * @return */ protected abstract GatherTask providerTask(AbstractStore store); /** * 终止调度器运行,在重要检查中,例如:参数配置或采集数据等设置不正确的时候无法启动调度器,此时需要终止。 *

该方法会调用调度管理器,更新调度器状态。

* @param schedulerId * @date 2022-09-13 */ protected abstract void terminateGatherScheduler(int schedulerId); /** * 是否检查目标表是否存在 * @return */ public boolean isCheckDestTableExist() { return checkDestTableExist; } /** * 设置是否检查目标表是否存在 * @param checkDestTableExist */ public void setCheckDestTableExist(boolean checkDestTableExist) { this.checkDestTableExist = checkDestTableExist; } }