package com.iplatform.recvideo; import com.iplatform.model.po.Rc_video_t1; import com.iplatform.recvideo.util.PythonInvokeUtils; import com.iplatform.recvideo.util.TestUtils; import com.iplatform.recvideo.util.VideoFileUtils; import com.walker.infrastructure.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; import java.util.List; /** * 视频相似度结果计算以及写入执行器。 *
 *     1)该对象为'有状态',在每次完成一个采集过程后,需要重新创建。
 * 
* @author 时克英 * @date 2022-09-23 */ public abstract class SimilarExecutor { protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); private String videoDataFolder = null; private String batchId = null; // 当前批次要处理的原始视频集合 private List videoFolderInfoList = null; private List videoIdList = new ArrayList<>(); // 记录当前执行到(该批次)哪个视频文件对应的第几个图片 private int currentVideoFolderIndex = -1; private int currentImageIndex = -1; // private VideoFolderInfo currentVideoFolderInfo = null; // private ImageInfo currentImageInfo = null; // 是否已完成视频加载调用,完成后才能检索相似视频 private boolean pythonLoadVideoDone = false; private int topN = 40; // 如果测试模式,数据是假的! private boolean testMode = false; public int getTopN() { return topN; } /** * 初始化对象调用一次 * @param videoDataFolder * @param batchId */ public void startup(String videoDataFolder, String batchId, int topN, boolean testMode){ if(StringUtils.isEmpty(videoDataFolder)){ throw new IllegalArgumentException("视频文件夹根目录必须设置!"); } if(StringUtils.isEmpty(batchId)){ throw new IllegalArgumentException("处理批次(时间)必须设置!"); } this.videoDataFolder = videoDataFolder; this.batchId = batchId; this.topN = topN; this.testMode = testMode; // this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId); } public void destroy(){ if(this.videoFolderInfoList != null){ this.videoFolderInfoList.clear(); } if(this.videoIdList != null){ this.videoIdList.clear(); } } /** * 在每次调度时钟周期执行一次。例如: 10秒一次。

* 注意:该方法英确保每次调用不会重复数据。 */ public int execute() throws Exception{ if(!this.pythonLoadVideoDone){ logger.debug("当前 pythonLoadVideoDone = false, 需要查询数据库是否已加载视频"); this.pythonLoadVideoDone = this.pythonLoadVideoDone(this.batchId, VideoFileUtils.combineBatchPath(videoDataFolder, batchId)); } // 1: 如果视频还未加载,则先加载视频 if(!this.pythonLoadVideoDone){ try{ String error = this.requestStartPythonLoadVideo(this.batchId); if(StringUtils.isNotEmpty(error)){ // 终止调用,等待下次调度继续尝试执行 logger.error("python调用加载视频返回错误:" + error); return -1; } this.pythonLoadVideoDone = true; } catch (Exception ex){ logger.error("python调用加载视频异常:" + this.batchId, ex); return -1; } } // 2: 加载完视频,需要查询每个图片相似度结果,并存储到数据库 // 这里注意,程序必须和AI服务器部署在一起,方便检索视频分析文件夹(本地) if(this.videoFolderInfoList == null){ if(testMode){ this.videoFolderInfoList = TestUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId); } else { this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId); } } if(StringUtils.isEmptyList(this.videoFolderInfoList)){ logger.warn("视频分析文件夹内容为空,无法继续查询相似度结果! videoFolderInfoList = null"); return -1; } for(VideoFolderInfo v : this.videoFolderInfoList){ this.videoIdList.add(v.getVideoId()); } if(this.isSearchWriteDone()){ logger.info("已经完成批次相似结果写入数据库,处理最后一步:写入用户推荐表数据。batch = " + this.batchId); try { this.writeRcVideoUser(this.batchId, this.videoIdList); // 返回1表示整个流程执行完毕。 return 1; } catch (Exception ex){ throw new Exception("writeRcVideoUser():" + ex.getMessage(), ex); } } // 开始检索相似度 if(this.currentVideoFolderIndex == -1){ this.currentVideoFolderIndex ++; } try { this.processOneSearchAndWrite(this.batchId); return 0; } catch (Exception e) { throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e); } } private void processOneSearchAndWrite(String batchId) throws Exception{ if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){ throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex); } VideoFolderInfo currentVideoFolderInfo = this.videoFolderInfoList.get(this.currentVideoFolderIndex); if(this.currentImageIndex == -1){ this.currentImageIndex ++; } ImageInfo imageInfo = currentVideoFolderInfo.getImageInfoList().get(this.currentImageIndex); List videoT1_list = this.acquirePythonSearchSimilarOnce(imageInfo.getVideoId() , imageInfo.getImagePath(), String.valueOf(this.topN)); if(!StringUtils.isEmptyList(videoT1_list)){ try { this.writeRcVideoT1(videoT1_list, PythonInvokeUtils.getFileNameWithoutSuffix(imageInfo.getImagePath(), Constants.IMAGE_SUFFIX)); } catch (Exception ex){ throw new Exception("writeRcVideoT1()执行错误:" + ex.getMessage(), ex); } } // 每个视频的最后一张图片 if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){ try { this.writeRcVideoT2(currentVideoFolderInfo, batchId); } catch (Exception ex){ throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex); } if((this.currentVideoFolderIndex + 1) < this.videoFolderInfoList.size()){ logger.debug("一个视频图像集合检索处理完毕,切换到下一个,currentImageIndex = " + this.currentImageIndex); this.currentVideoFolderIndex ++; this.currentImageIndex = -1; } else { // 让最后一个图像集合索引值超过界限,表示最后一个视频已处理完毕。 this.currentImageIndex ++; logger.debug("所有视频包含的所有图像处理完毕,currentVideoFolderIndex = " + this.currentVideoFolderIndex); } return; } this.currentImageIndex ++; } /** * 判断是否已经全部把图片相似度结果写入到数据库中。(针对该批次) * @return */ private boolean isSearchWriteDone(){ if(this.currentVideoFolderIndex == -1 || this.currentImageIndex == -1){ return false; } if((this.currentVideoFolderIndex+1) == this.videoFolderInfoList.size()){ VideoFolderInfo lastVideo = this.videoFolderInfoList.get(this.currentVideoFolderIndex); // 如果最后一个视频处理图片数量超过已有数量,判断肯定处理完毕 if((this.currentImageIndex+1) > lastVideo.getImageInfoSize()){ return true; } } return false; } /** * 查询数据库,检查是否已经完成本次批次视频加载。读这个表: milvus_video_status * @param batchId * @param batchFolder 批次所在文件夹全路径,如: /opt/ai/video/20220921 * @return */ protected abstract boolean pythonLoadVideoDone(String batchId, String batchFolder); /** * 请求AI服务,开始一个批次视频数据导入。 * @param batchId * @return */ protected abstract String requestStartPythonLoadVideo(String batchId) throws Exception; /** * 请求AI服务,检索给定图片的相似度结果集合。 * @return */ protected abstract List acquirePythonSearchSimilarOnce(String videoId , String imagePath, String topN) throws Exception; /** * 第一个临时表'rc_video_t1',每张图像包含多个相似视频记录。 * @param list * @param srcImageId 原始视频ID */ protected abstract void writeRcVideoT1(List list, String srcImageId); /** * 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2' * @param videoFolderInfo */ protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId); /** * 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user' * @param batchId * @param recVideoIdList 本批次处理原始视频id集合 */ protected abstract void writeRcVideoUser(String batchId, List recVideoIdList); }