package com.iplatform.recvideo;
|
|
import com.iplatform.model.po.Rc_video_t1;
|
import com.iplatform.recvideo.util.PythonInvokeUtils;
|
import com.iplatform.recvideo.util.TestUtils;
|
import com.iplatform.recvideo.util.VideoFileUtils;
|
import com.walker.infrastructure.utils.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* 视频相似度结果计算以及写入执行器。
|
* <pre>
|
* 1)该对象为'有状态',在每次完成一个采集过程后,需要重新创建。
|
* </pre>
|
* @author 时克英
|
* @date 2022-09-23
|
*/
|
public abstract class SimilarExecutor {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
private String videoDataFolder = null;
|
|
private String batchId = null;
|
|
// 当前批次要处理的原始视频集合
|
private List<VideoFolderInfo> videoFolderInfoList = null;
|
private List<String> videoIdList = new ArrayList<>();
|
|
// 记录当前执行到(该批次)哪个视频文件对应的第几个图片
|
private int currentVideoFolderIndex = -1;
|
private int currentImageIndex = -1;
|
// private VideoFolderInfo currentVideoFolderInfo = null;
|
// private ImageInfo currentImageInfo = null;
|
|
// 是否已完成视频加载调用,完成后才能检索相似视频
|
private boolean pythonLoadVideoDone = false;
|
|
private int topN = 40;
|
|
// 如果测试模式,数据是假的!
|
private boolean testMode = false;
|
|
public int getTopN() {
|
return topN;
|
}
|
|
/**
|
* 初始化对象调用一次
|
* @param videoDataFolder
|
* @param batchId
|
*/
|
public void startup(String videoDataFolder, String batchId, int topN, boolean testMode){
|
if(StringUtils.isEmpty(videoDataFolder)){
|
throw new IllegalArgumentException("视频文件夹根目录必须设置!");
|
}
|
if(StringUtils.isEmpty(batchId)){
|
throw new IllegalArgumentException("处理批次(时间)必须设置!");
|
}
|
this.videoDataFolder = videoDataFolder;
|
this.batchId = batchId;
|
this.topN = topN;
|
this.testMode = testMode;
|
// this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
|
}
|
|
public void destroy(){
|
if(this.videoFolderInfoList != null){
|
this.videoFolderInfoList.clear();
|
}
|
if(this.videoIdList != null){
|
this.videoIdList.clear();
|
}
|
}
|
|
/**
|
* 在每次调度时钟周期执行一次。例如: 10秒一次。<p></p>
|
* 注意:该方法英确保每次调用不会重复数据。
|
*/
|
public int execute() throws Exception{
|
if(!this.pythonLoadVideoDone){
|
logger.debug("当前 pythonLoadVideoDone = false, 需要查询数据库是否已加载视频");
|
this.pythonLoadVideoDone = this.pythonLoadVideoDone(this.batchId, VideoFileUtils.combineBatchPath(videoDataFolder, batchId));
|
}
|
|
// 1: 如果视频还未加载,则先加载视频
|
if(!this.pythonLoadVideoDone){
|
try{
|
String error = this.requestStartPythonLoadVideo(this.batchId);
|
if(StringUtils.isNotEmpty(error)){
|
// 终止调用,等待下次调度继续尝试执行
|
logger.error("python调用加载视频返回错误:" + error);
|
return -1;
|
}
|
this.pythonLoadVideoDone = true;
|
|
} catch (Exception ex){
|
logger.error("python调用加载视频异常:" + this.batchId, ex);
|
return -1;
|
}
|
}
|
|
// 2: 加载完视频,需要查询每个图片相似度结果,并存储到数据库
|
// 这里注意,程序必须和AI服务器部署在一起,方便检索视频分析文件夹(本地)
|
if(this.videoFolderInfoList == null){
|
if(testMode){
|
this.videoFolderInfoList = TestUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
|
} else {
|
this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
|
}
|
}
|
if(StringUtils.isEmptyList(this.videoFolderInfoList)){
|
logger.warn("视频分析文件夹内容为空,无法继续查询相似度结果! videoFolderInfoList = null");
|
return -1;
|
}
|
for(VideoFolderInfo v : this.videoFolderInfoList){
|
this.videoIdList.add(v.getVideoId());
|
}
|
|
if(this.isSearchWriteDone()){
|
logger.info("已经完成批次相似结果写入数据库,处理最后一步:写入用户推荐表数据。batch = " + this.batchId);
|
try {
|
this.writeRcVideoUser(this.batchId, this.videoIdList);
|
// 返回1表示整个流程执行完毕。
|
return 1;
|
} catch (Exception ex){
|
throw new Exception("writeRcVideoUser():" + ex.getMessage(), ex);
|
}
|
}
|
|
// 开始检索相似度
|
if(this.currentVideoFolderIndex == -1){
|
this.currentVideoFolderIndex ++;
|
}
|
|
try {
|
this.processOneSearchAndWrite(this.batchId);
|
return 0;
|
} catch (Exception e) {
|
throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e);
|
}
|
}
|
|
private void processOneSearchAndWrite(String batchId) throws Exception{
|
if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){
|
throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex);
|
}
|
VideoFolderInfo currentVideoFolderInfo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
|
|
if(this.currentImageIndex == -1){
|
this.currentImageIndex ++;
|
}
|
ImageInfo imageInfo = currentVideoFolderInfo.getImageInfoList().get(this.currentImageIndex);
|
|
List<Rc_video_t1> videoT1_list = this.acquirePythonSearchSimilarOnce(imageInfo.getVideoId()
|
, imageInfo.getImagePath(), String.valueOf(this.topN));
|
if(!StringUtils.isEmptyList(videoT1_list)){
|
try {
|
this.writeRcVideoT1(videoT1_list, PythonInvokeUtils.getFileNameWithoutSuffix(imageInfo.getImagePath(), Constants.IMAGE_SUFFIX));
|
} catch (Exception ex){
|
throw new Exception("writeRcVideoT1()执行错误:" + ex.getMessage(), ex);
|
}
|
}
|
|
// 每个视频的最后一张图片
|
if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){
|
try {
|
this.writeRcVideoT2(currentVideoFolderInfo, batchId);
|
} catch (Exception ex){
|
throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex);
|
}
|
if((this.currentVideoFolderIndex + 1) < this.videoFolderInfoList.size()){
|
logger.debug("一个视频图像集合检索处理完毕,切换到下一个,currentImageIndex = " + this.currentImageIndex);
|
this.currentVideoFolderIndex ++;
|
this.currentImageIndex = -1;
|
} else {
|
// 让最后一个图像集合索引值超过界限,表示最后一个视频已处理完毕。
|
this.currentImageIndex ++;
|
logger.debug("所有视频包含的所有图像处理完毕,currentVideoFolderIndex = " + this.currentVideoFolderIndex);
|
}
|
return;
|
}
|
this.currentImageIndex ++;
|
}
|
|
/**
|
* 判断是否已经全部把图片相似度结果写入到数据库中。(针对该批次)
|
* @return
|
*/
|
private boolean isSearchWriteDone(){
|
if(this.currentVideoFolderIndex == -1 || this.currentImageIndex == -1){
|
return false;
|
}
|
if((this.currentVideoFolderIndex+1) == this.videoFolderInfoList.size()){
|
VideoFolderInfo lastVideo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
|
// 如果最后一个视频处理图片数量超过已有数量,判断肯定处理完毕
|
if((this.currentImageIndex+1) > lastVideo.getImageInfoSize()){
|
return true;
|
}
|
}
|
return false;
|
}
|
|
/**
|
* 查询数据库,检查是否已经完成本次批次视频加载。读这个表: milvus_video_status
|
* @param batchId
|
* @param batchFolder 批次所在文件夹全路径,如: /opt/ai/video/20220921
|
* @return
|
*/
|
protected abstract boolean pythonLoadVideoDone(String batchId, String batchFolder);
|
|
/**
|
* 请求AI服务,开始一个批次视频数据导入。
|
* @param batchId
|
* @return
|
*/
|
protected abstract String requestStartPythonLoadVideo(String batchId) throws Exception;
|
|
/**
|
* 请求AI服务,检索给定图片的相似度结果集合。
|
* @return
|
*/
|
protected abstract List<Rc_video_t1> acquirePythonSearchSimilarOnce(String videoId
|
, String imagePath, String topN) throws Exception;
|
|
/**
|
* 第一个临时表'rc_video_t1',每张图像包含多个相似视频记录。
|
* @param list
|
* @param srcImageId 原始视频ID
|
*/
|
protected abstract void writeRcVideoT1(List<Rc_video_t1> list, String srcImageId);
|
|
/**
|
* 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2'
|
* @param videoFolderInfo
|
*/
|
protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId);
|
|
/**
|
* 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user'
|
* @param batchId
|
* @param recVideoIdList 本批次处理原始视频id集合
|
*/
|
protected abstract void writeRcVideoUser(String batchId, List<String> recVideoIdList);
|
}
|