package com.iplatform.recvideo.scheduler; import com.iplatform.core.BeanContextAware; import com.iplatform.gather.LocalAddress; import com.iplatform.recvideo.VideoScheduler; import com.walker.store.AbstractStore; import com.walker.store.task.GatherTask; /** * 视频信息存储、搜索相似度处理的调度任务。

* 该任务会持续运行: *
 *     1)
 * 
* @author 时克英 * @date 2022-09-26 */ public class VideoSearchScheduler extends VideoScheduler { private VideoSearchTask videoSearchTask = null; private VideoSearchMeta currentVideoSearchMeta = null; // private int failedCount = 0; public VideoSearchScheduler(int id, String name){ super(id, name); } @Override protected GatherTask providerTask(AbstractStore store) { LocalAddress localAddress = BeanContextAware.getBeanByType(LocalAddress.class); VideoSearchTask task = new VideoSearchTask("视频相似度检索任务", localAddress); return task; } // @Override // protected Object[] getRunParameters(Object previousInvokeResult) {} @Override protected Object onProcess(Object[] inputParams) throws Exception { if(this.videoSearchTask == null){ logger.debug("调度任务初始化,需要先查询任务状态表"); this.videoSearchTask = (VideoSearchTask)this.providerTask(this.getStore()); Object[] params = new Object[]{this.videoSearchTask.getSql(), new Object[]{}}; this.currentVideoSearchMeta = (VideoSearchMeta)this.videoSearchTask.run(null, null, null, params); if(this.currentVideoSearchMeta == null){ return null; } logger.debug("开始启动批次查询:" + this.currentVideoSearchMeta.getBatchId()); } else { // 如果 this.videoSearchTask 已经初始化,但 currentVideoSearchMeta == null,则需要查询数据库状态 Object[] params = new Object[]{this.videoSearchTask.getSql(), new Object[]{}}; this.currentVideoSearchMeta = (VideoSearchMeta)this.videoSearchTask.run(null, null, null, params); } if(this.currentVideoSearchMeta == null){ // 说明上次已经创建 videoSearchTask 任务,但没有返回有效任务记录 logger.debug(".........没有查询到要采集的记录:currentVideoSearchMeta"); return null; } // this.videoSearchTask.checkExecutor(this.currentVideoSearchMeta); // int result = 0; try{ result = this.videoSearchTask.executeOneSearch(); } catch (Exception ex){ logger.error("executeOneSearch(): 执行异常:" + ex.getMessage(), ex); result = -1; } if(result == 0){ return SUCCESS; } if(result == -1){ this.failedCount ++; if(this.failedCount > 3){ // 如果报错超过多次,返回空,让线程休眠一下 this.failedCount = 0; return null; } else { return SUCCESS; } } if(result == 1){ // 完成,准备查找下一个批次记录 logger.info("完成批次,准备查找下一个批次记录。batchId = " + this.currentVideoSearchMeta.getBatchId()); this.clearStatus(); return SUCCESS; } return null; // if(this.gatherTask == null){ // this.gatherTask = this.providerTask(this.store); // if(this.gatherTask == null){ // throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName()); // } // } // // srcName, createTableSQL, parameter, params[] // return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]); } private void clearStatus(){ if(this.videoSearchTask != null){ this.videoSearchTask = null; } this.currentVideoSearchMeta = null; } }