shikeying
2022-09-24 7b3b249a7f2320f97e21e94e26a65f4b4ead0b6e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package com.iplatform.recvideo;
 
import com.iplatform.model.po.Rc_video_t1;
import com.iplatform.recvideo.util.PythonInvokeUtils;
import com.iplatform.recvideo.util.TestUtils;
import com.iplatform.recvideo.util.VideoFileUtils;
import com.walker.infrastructure.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.File;
import java.util.ArrayList;
import java.util.List;
 
/**
 * 视频相似度结果计算以及写入执行器。
 * <pre>
 *     1)该对象为'有状态',在每次完成一个采集过程后,需要重新创建。
 * </pre>
 * @author 时克英
 * @date 2022-09-23
 */
public abstract class SimilarExecutor {
 
    protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
 
    private String videoDataFolder = null;
 
    private String batchId = null;
 
    // 当前批次要处理的原始视频集合
    private List<VideoFolderInfo> videoFolderInfoList = null;
    private List<String> videoIdList = new ArrayList<>();
 
    // 记录当前执行到(该批次)哪个视频文件对应的第几个图片
    private int currentVideoFolderIndex = -1;
    private int currentImageIndex = -1;
//    private VideoFolderInfo currentVideoFolderInfo = null;
//    private ImageInfo currentImageInfo = null;
 
    // 是否已完成视频加载调用,完成后才能检索相似视频
    private boolean pythonLoadVideoDone = false;
 
    private int topN = 40;
 
    // 如果测试模式,数据是假的!
    private boolean testMode = false;
 
    public int getTopN() {
        return topN;
    }
 
    /**
     * 初始化对象调用一次
     * @param videoDataFolder
     * @param batchId
     */
    public void startup(String videoDataFolder, String batchId, int topN, boolean testMode){
        if(StringUtils.isEmpty(videoDataFolder)){
            throw new IllegalArgumentException("视频文件夹根目录必须设置!");
        }
        if(StringUtils.isEmpty(batchId)){
            throw new IllegalArgumentException("处理批次(时间)必须设置!");
        }
        this.videoDataFolder = videoDataFolder;
        this.batchId = batchId;
        this.topN = topN;
        this.testMode = testMode;
//        this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
    }
 
    public void destroy(){
        if(this.videoFolderInfoList != null){
            this.videoFolderInfoList.clear();
        }
        if(this.videoIdList != null){
            this.videoIdList.clear();
        }
    }
 
    /**
     * 在每次调度时钟周期执行一次。例如: 10秒一次。<p></p>
     * 注意:该方法英确保每次调用不会重复数据。
     */
    public int execute() throws Exception{
        if(!this.pythonLoadVideoDone){
            logger.debug("当前 pythonLoadVideoDone = false, 需要查询数据库是否已加载视频");
            this.pythonLoadVideoDone = this.pythonLoadVideoDone(this.batchId, VideoFileUtils.combineBatchPath(videoDataFolder, batchId));
        }
 
        // 1: 如果视频还未加载,则先加载视频
        if(!this.pythonLoadVideoDone){
            try{
                String error = this.requestStartPythonLoadVideo(this.batchId);
                if(StringUtils.isNotEmpty(error)){
                    // 终止调用,等待下次调度继续尝试执行
                    logger.error("python调用加载视频返回错误:" + error);
                    return -1;
                }
                this.pythonLoadVideoDone = true;
 
            } catch (Exception ex){
                logger.error("python调用加载视频异常:" + this.batchId, ex);
                return -1;
            }
        }
 
        // 2: 加载完视频,需要查询每个图片相似度结果,并存储到数据库
        //    这里注意,程序必须和AI服务器部署在一起,方便检索视频分析文件夹(本地)
        if(this.videoFolderInfoList == null){
            if(testMode){
                this.videoFolderInfoList = TestUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
            } else {
                this.videoFolderInfoList = VideoFileUtils.getBatchVideoFolderInfo(this.videoDataFolder, batchId);
            }
        }
        if(StringUtils.isEmptyList(this.videoFolderInfoList)){
            logger.warn("视频分析文件夹内容为空,无法继续查询相似度结果! videoFolderInfoList = null");
            return -1;
        }
        for(VideoFolderInfo v : this.videoFolderInfoList){
            this.videoIdList.add(v.getVideoId());
        }
 
        if(this.isSearchWriteDone()){
            logger.info("已经完成批次相似结果写入数据库,处理最后一步:写入用户推荐表数据。batch = " + this.batchId);
            try {
                this.writeRcVideoUser(this.batchId, this.videoIdList);
                // 返回1表示整个流程执行完毕。
                return 1;
            } catch (Exception ex){
                throw new Exception("writeRcVideoUser():" + ex.getMessage(), ex);
            }
        }
 
        // 开始检索相似度
        if(this.currentVideoFolderIndex == -1){
            this.currentVideoFolderIndex ++;
        }
 
        try {
            this.processOneSearchAndWrite();
            return 0;
        } catch (Exception e) {
            throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e);
        }
    }
 
    private void processOneSearchAndWrite() throws Exception{
        if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){
            throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex);
        }
        VideoFolderInfo currentVideoFolderInfo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
 
        if(this.currentImageIndex == -1){
            this.currentImageIndex ++;
        }
        ImageInfo imageInfo = currentVideoFolderInfo.getImageInfoList().get(this.currentImageIndex);
 
        List<Rc_video_t1> videoT1_list = this.acquirePythonSearchSimilarOnce(imageInfo.getVideoId()
                , imageInfo.getImagePath(), String.valueOf(this.topN));
        if(!StringUtils.isEmptyList(videoT1_list)){
            try {
                this.writeRcVideoT1(videoT1_list, PythonInvokeUtils.getFileNameWithoutSuffix(imageInfo.getImagePath(), Constants.IMAGE_SUFFIX));
            } catch (Exception ex){
                throw new Exception("writeRcVideoT1()执行错误:" + ex.getMessage(), ex);
            }
        }
 
        // 每个视频的最后一张图片
        if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){
            try {
                this.writeRcVideoT2(currentVideoFolderInfo);
            } catch (Exception ex){
                throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex);
            }
            if((this.currentVideoFolderIndex + 1) < this.videoFolderInfoList.size()){
                logger.debug("一个视频图像集合检索处理完毕,切换到下一个,currentImageIndex = " + this.currentImageIndex);
                this.currentVideoFolderIndex ++;
                this.currentImageIndex = -1;
            } else {
                // 让最后一个图像集合索引值超过界限,表示最后一个视频已处理完毕。
                this.currentImageIndex ++;
                logger.debug("所有视频包含的所有图像处理完毕,currentVideoFolderIndex = " + this.currentVideoFolderIndex);
            }
            return;
        }
        this.currentImageIndex ++;
    }
 
    /**
     * 判断是否已经全部把图片相似度结果写入到数据库中。(针对该批次)
     * @return
     */
    private boolean isSearchWriteDone(){
        if(this.currentVideoFolderIndex == -1 || this.currentImageIndex == -1){
            return false;
        }
        if((this.currentVideoFolderIndex+1) == this.videoFolderInfoList.size()){
            VideoFolderInfo lastVideo = this.videoFolderInfoList.get(this.currentVideoFolderIndex);
            // 如果最后一个视频处理图片数量超过已有数量,判断肯定处理完毕
            if((this.currentImageIndex+1) > lastVideo.getImageInfoSize()){
                return true;
            }
        }
        return false;
    }
 
    /**
     * 查询数据库,检查是否已经完成本次批次视频加载。读这个表: milvus_video_status
     * @param batchId
     * @param batchFolder 批次所在文件夹全路径,如: /opt/ai/video/20220921
     * @return
     */
    protected abstract boolean pythonLoadVideoDone(String batchId, String batchFolder);
 
    /**
     * 请求AI服务,开始一个批次视频数据导入。
     * @param batchId
     * @return
     */
    protected abstract String requestStartPythonLoadVideo(String batchId) throws Exception;
 
    /**
     * 请求AI服务,检索给定图片的相似度结果集合。
     * @return
     */
    protected abstract List<Rc_video_t1> acquirePythonSearchSimilarOnce(String videoId
            , String imagePath, String topN) throws Exception;
 
    /**
     * 第一个临时表'rc_video_t1',每张图像包含多个相似视频记录。
     * @param list
     * @param srcImageId 原始视频ID
     */
    protected abstract void writeRcVideoT1(List<Rc_video_t1> list, String srcImageId);
 
    /**
     * 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2'
     * @param videoFolderInfo
     */
    protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo);
 
    /**
     * 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user'
     * @param batchId
     * @param recVideoIdList 推荐视频id集合
     */
    protected abstract void writeRcVideoUser(String batchId, List<String> recVideoIdList);
}