package com.iplatform.recvideo;
import com.iplatform.model.po.Rc_video_t1;
import com.iplatform.recvideo.util.PythonInvokeUtils;
import com.iplatform.recvideo.util.TestUtils;
import com.iplatform.recvideo.util.VideoFileUtils;
import com.walker.infrastructure.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
/**
* 视频相似度结果计算以及写入执行器。
*
* 1)该对象为'有状态',在每次完成一个采集过程后,需要重新创建。
*
* @author 时克英
* @date 2022-09-23
*/
public abstract class SimilarExecutor {
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
private String videoDataFolder = null;
private String batchId = null;
// 当前批次要处理的原始视频集合
private List videoFolderInfoList = null;
private List videoIdList = new ArrayList<>();
// 记录当前执行到(该批次)哪个视频文件对应的第几个图片
private int currentVideoFolderIndex = -1;
private int currentImageIndex = -1;
// private VideoFolderInfo currentVideoFolderInfo = null;
// private ImageInfo currentImageInfo = null;
// 是否已完成视频加载调用,完成后才能检索相似视频
private boolean pythonLoadVideoDone = false;
private int topN = 40;
// 如果测试模式,数据是假的!
private boolean testMode = false;
public int getTopN() {
return topN;
}
/**
* 初始化对象调用一次
* @param videoDataFolder
* @param batchId
*/
public void startup(String videoDataFolder, String batchId, int topN, boolean testMode){
if(StringUtils.isEmpty(videoDataFolder)){
throw new IllegalArgumentException("视频文件夹根目录必须设置!");
}
if(StringUtils.isEmpty(batchId)){
throw new IllegalArgumentException("处理批次(时间)必须设置!");
}
this.videoDataFolder = videoDataFolder;
this.batchId = batchId;
this.topN = topN;
this.testMode = testMode;
// this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
}
public void destroy(){
if(this.videoFolderInfoList != null){
this.videoFolderInfoList.clear();
}
if(this.videoIdList != null){
this.videoIdList.clear();
}
}
/**
* 在每次调度时钟周期执行一次。例如: 10秒一次。
* 注意:该方法英确保每次调用不会重复数据。
*/
public int execute() throws Exception{
if(!this.pythonLoadVideoDone){
logger.debug("当前 pythonLoadVideoDone = false, 需要查询数据库是否已加载视频");
this.pythonLoadVideoDone = this.pythonLoadVideoDone(this.batchId, VideoFileUtils.combineBatchPath(videoDataFolder, batchId));
}
// 1: 如果视频还未加载,则先加载视频
if(!this.pythonLoadVideoDone){
try{
String error = this.requestStartPythonLoadVideo(this.batchId);
if(StringUtils.isNotEmpty(error)){
// 终止调用,等待下次调度继续尝试执行
logger.error("python调用加载视频返回错误:" + error);
return -1;
}
this.pythonLoadVideoDone = true;
} catch (Exception ex){
logger.error("python调用加载视频异常:" + this.batchId, ex);
return -1;
}
}
// 2: 加载完视频,需要查询每个图片相似度结果,并存储到数据库
// 这里注意,程序必须和AI服务器部署在一起,方便检索视频分析文件夹(本地)
if(this.videoFolderInfoList == null){
if(testMode){
this.videoFolderInfoList = TestUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
} else {
this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
}
}
if(StringUtils.isEmptyList(this.videoFolderInfoList)){
logger.warn("视频分析文件夹内容为空,无法继续查询相似度结果! videoFolderInfoList = null");
return -1;
}
for(VideoFolderInfo v : this.videoFolderInfoList){
this.videoIdList.add(v.getVideoId());
}
if(this.isSearchWriteDone()){
logger.info("已经完成批次相似结果写入数据库,处理最后一步:写入用户推荐表数据。batch = " + this.batchId);
try {
this.writeRcVideoUser(this.batchId, this.videoIdList);
// 返回1表示整个流程执行完毕。
return 1;
} catch (Exception ex){
throw new Exception("writeRcVideoUser():" + ex.getMessage(), ex);
}
}
// 开始检索相似度
if(this.currentVideoFolderIndex == -1){
this.currentVideoFolderIndex ++;
}
try {
this.processOneSearchAndWrite(this.batchId);
return 0;
} catch (Exception e) {
throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e);
}
}
private void processOneSearchAndWrite(String batchId) throws Exception{
if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){
throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex);
}
VideoFolderInfo currentVideoFolderInfo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
if(this.currentImageIndex == -1){
this.currentImageIndex ++;
}
ImageInfo imageInfo = currentVideoFolderInfo.getImageInfoList().get(this.currentImageIndex);
List videoT1_list = this.acquirePythonSearchSimilarOnce(imageInfo.getVideoId()
, imageInfo.getImagePath(), String.valueOf(this.topN));
if(!StringUtils.isEmptyList(videoT1_list)){
try {
this.writeRcVideoT1(videoT1_list, PythonInvokeUtils.getFileNameWithoutSuffix(imageInfo.getImagePath(), Constants.IMAGE_SUFFIX));
} catch (Exception ex){
throw new Exception("writeRcVideoT1()执行错误:" + ex.getMessage(), ex);
}
}
// 每个视频的最后一张图片
if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){
try {
this.writeRcVideoT2(currentVideoFolderInfo, batchId);
} catch (Exception ex){
throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex);
}
if((this.currentVideoFolderIndex + 1) < this.videoFolderInfoList.size()){
logger.debug("一个视频图像集合检索处理完毕,切换到下一个,currentImageIndex = " + this.currentImageIndex);
this.currentVideoFolderIndex ++;
this.currentImageIndex = -1;
} else {
// 让最后一个图像集合索引值超过界限,表示最后一个视频已处理完毕。
this.currentImageIndex ++;
logger.debug("所有视频包含的所有图像处理完毕,currentVideoFolderIndex = " + this.currentVideoFolderIndex);
}
return;
}
this.currentImageIndex ++;
}
/**
* 判断是否已经全部把图片相似度结果写入到数据库中。(针对该批次)
* @return
*/
private boolean isSearchWriteDone(){
if(this.currentVideoFolderIndex == -1 || this.currentImageIndex == -1){
return false;
}
if((this.currentVideoFolderIndex+1) == this.videoFolderInfoList.size()){
VideoFolderInfo lastVideo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
// 如果最后一个视频处理图片数量超过已有数量,判断肯定处理完毕
if((this.currentImageIndex+1) > lastVideo.getImageInfoSize()){
return true;
}
}
return false;
}
/**
* 查询数据库,检查是否已经完成本次批次视频加载。读这个表: milvus_video_status
* @param batchId
* @param batchFolder 批次所在文件夹全路径,如: /opt/ai/video/20220921
* @return
*/
protected abstract boolean pythonLoadVideoDone(String batchId, String batchFolder);
/**
* 请求AI服务,开始一个批次视频数据导入。
* @param batchId
* @return
*/
protected abstract String requestStartPythonLoadVideo(String batchId) throws Exception;
/**
* 请求AI服务,检索给定图片的相似度结果集合。
* @return
*/
protected abstract List acquirePythonSearchSimilarOnce(String videoId
, String imagePath, String topN) throws Exception;
/**
* 第一个临时表'rc_video_t1',每张图像包含多个相似视频记录。
* @param list
* @param srcImageId 原始视频ID
*/
protected abstract void writeRcVideoT1(List list, String srcImageId);
/**
* 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2'
* @param videoFolderInfo
*/
protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId);
/**
* 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user'
* @param batchId
* @param recVideoIdList 本批次处理原始视频id集合
*/
protected abstract void writeRcVideoUser(String batchId, List recVideoIdList);
}