shikeying
2022-09-26 fd03e31f173ad9c52b15a30a9127e2b6a468538d
视频相似度分析2
6个文件已添加
7个文件已修改
572 ■■■■■ 已修改文件
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/doc/table.SQL 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java
@@ -37,6 +37,10 @@
    @JsonIgnore
    protected boolean isset_score = false;
    private String batch_id = null;
    @JsonIgnore
    protected boolean isset_batch_id = false;
    /**
     * 默认构造函数
     */
@@ -114,6 +118,17 @@
        return this.score == null;
    }
    public String getBatch_id(){return this.batch_id;}
    public void setBatch_id(String batch_id){
        this.batch_id = batch_id;
        this.isset_batch_id = true;
    }
    @JsonIgnore
    public boolean isEmptyBatch_id(){return this.batch_id == null || this.batch_id.length() == 0;}
    /**
     * 重写 toString() 方法
     */
@@ -124,6 +139,7 @@
                .append("src_video_id=").append(this.src_video_id)
                .append("sim_video_id=").append(this.sim_video_id)
                .append("score=").append(this.score)
                .append("batch_id=").append(this.batch_id)
                .toString();
    }
@@ -150,6 +166,9 @@
        if (this.isset_score) {
            rc_video_t2.setScore(this.getScore());
        }
        if(this.isset_batch_id){
            rc_video_t2.setBatch_id(this.getBatch_id());
        }
        return rc_video_t2;
    }
}
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java
@@ -7,8 +7,6 @@
import com.walker.jdbc.sqlgen.InsertBuilder;
import com.walker.jdbc.sqlgen.SelectBuilder;
import com.walker.jdbc.sqlgen.UpdateBuilder;
import com.walker.jdbc.util.StringUtils;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
@@ -31,6 +29,7 @@
    public static final String SRC_VIDEO_ID = "src_video_id";
    public static final String SIM_VIDEO_ID = "sim_video_id";
    public static final String SCORE = "score";
    public static final String BATCH_ID = "batch_id";
    /**
     * 默认构造函数
@@ -52,6 +51,9 @@
        }
        if (rc_video_t2.isset_score) {
            this.setScore(rc_video_t2.getScore());
        }
        if(rc_video_t2.isset_batch_id){
            this.setBatch_id(rc_video_t2.getBatch_id());
        }
        // 去掉,2022-09-07
        // this.setDatabaseName_(rc_video_t2.getDatabaseName_());
@@ -99,6 +101,7 @@
        ib.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
        ib.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
        ib.set(SCORE, this.getScore(), this.isset_score);
        ib.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
        return ib.genMapSql();
    }
@@ -111,6 +114,7 @@
        ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
        ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
        ub.set(SCORE, this.getScore(), this.isset_score);
        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
        ub.where(this.getPkName_(), this.getPkValue_());
        return ub.genMapSql();
    }
@@ -124,7 +128,7 @@
        ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
        ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
        ub.set(SCORE, this.getScore(), this.isset_score);
        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
        return ub.genMapSql(where, parameters);
    }
@@ -137,7 +141,7 @@
        ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
        ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
        ub.set(SCORE, this.getScore(), this.isset_score);
        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
        return ub.genArraySql(where, parameters);
    }
@@ -185,7 +189,7 @@
     */
    @Override
    public SqlAndParameters<Map<String, Object>> getSelectSql_(String where, Map<String, Object> parameters) {
        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters);
        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters);
    }
    /**
@@ -193,7 +197,7 @@
     */
    @Override
    public SqlAndParameters<Object[]> getSelectSql_(String where, Object[] parameters) {
        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters);
        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters);
    }
    /**
@@ -238,6 +242,10 @@
        if (columnIndex > 0) {
            rc_video_t2.setSim_video_id(rs.getString(columnIndex));
        }
        columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.BATCH_ID);
        if (columnIndex > 0) {
            rc_video_t2.setBatch_id(rs.getString(columnIndex));
        }
        columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.SCORE);
        if (columnIndex > 0) {
            if (rs.getBigDecimal(columnIndex) == null) {
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java
@@ -20,11 +20,15 @@
    public static final Rc_video_user ROW_MAPPER = new Rc_video_user();
    // 主键
    private Long id;
    @JsonIgnore
    protected boolean isset_id = false;
    // 属性列表
    private Long user_id = null;
    @JsonIgnore
    protected boolean isset_user_id = false;
    // 属性列表
    private String video_id = null;
    @JsonIgnore
    protected boolean isset_video_id = false;
@@ -46,8 +50,8 @@
    /**
     * 根据主键构造对象
     */
    public Rc_video_user(Long user_id) {
        this.setUser_id(user_id);
    public Rc_video_user(Long id) {
        this.setId(id);
    }
    /**
@@ -55,7 +59,19 @@
     */
    @Override
    public void setPkValue(Object value) {
        this.setUser_id((Long) value);
        this.setId((Long) value);
    }
    public Long getId(){return this.id;}
    public void setId(Long id){
        this.id = id;
        this.isset_id = true;
    }
    @JsonIgnore
    public boolean isEmptyId() {
        return this.id == null;
    }
    public Long getUser_id() {
recommend-video/doc/table.SQL
@@ -6,4 +6,12 @@
    ADD INDEX inx_src_vid (src_video_id) USING BTREE ;
ALTER TABLE rc_video_t2
    ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ;
    ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ;
ALTER TABLE rc_video_t2
    ADD INDEX inx_t2_src_bid (batch_id) USING BTREE ;
-- ALTER TABLE rc_video_batch
--     ADD INDEX inx_vb_vid (src_video_id) USING BTREE ;
ALTER TABLE rc_video_batch
    ADD INDEX inx_vb_bid (batch_id) USING BTREE ;
recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java
@@ -139,14 +139,14 @@
        }
        try {
            this.processOneSearchAndWrite();
            this.processOneSearchAndWrite(this.batchId);
            return 0;
        } catch (Exception e) {
            throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e);
        }
    }
    private void processOneSearchAndWrite() throws Exception{
    private void processOneSearchAndWrite(String batchId) throws Exception{
        if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){
            throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex);
        }
@@ -170,7 +170,7 @@
        // 每个视频的最后一张图片
        if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){
            try {
                this.writeRcVideoT2(currentVideoFolderInfo);
                this.writeRcVideoT2(currentVideoFolderInfo, batchId);
            } catch (Exception ex){
                throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex);
            }
@@ -239,12 +239,12 @@
     * 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2'
     * @param videoFolderInfo
     */
    protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo);
    protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId);
    /**
     * 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user'
     * @param batchId
     * @param recVideoIdList 推荐视频id集合
     * @param recVideoIdList 本批次处理原始视频id集合
     */
    protected abstract void writeRcVideoUser(String batchId, List<String> recVideoIdList);
}
recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java
New file
@@ -0,0 +1,68 @@
package com.iplatform.recvideo;
public class SimilarVideoUser implements Comparable<SimilarVideoUser>{
    private long userId;
    private String recommendVideoId;
    private double score = 0;
    // 这个相似视频,对应的源视频图像张数
    private int count = 0;
    public SimilarVideoUser(long userId, String recommendVideoId){
        this.userId = userId;
        this.recommendVideoId = recommendVideoId;
    }
    public void increase(){
        this.count ++;
    }
    public void setScore(double totalScore){
        this.score = totalScore;
    }
    public long getUserId() {
        return userId;
    }
    public String getRecommendVideoId() {
        return recommendVideoId;
    }
    public double getScore() {
        return score;
    }
    @Override
    public String toString(){
        return new StringBuilder("[recVideoId=").append(this.recommendVideoId)
                .append(", score=").append(this.score)
                .append("]").toString();
    }
    @Override
    public int hashCode(){
        return this.recommendVideoId.hashCode();
    }
    @Override
    public boolean equals(Object obj){
        if(obj == null){
            return false;
        }
        if(obj instanceof SimilarVideoUser){
            SimilarVideoUser e = (SimilarVideoUser)obj;
            if(e.recommendVideoId.equals(this.recommendVideoId)){
                return true;
            }
        }
        return false;
    }
    @Override
    public int compareTo(SimilarVideoUser o) {
        return o.count - this.count;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java
New file
@@ -0,0 +1,90 @@
package com.iplatform.recvideo;
import com.iplatform.model.po.Rc_video_t2;
import com.walker.infrastructure.utils.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 对用户推荐视频进行计算排序。
 * @author 时克英
 * @date 2022-09-26
 */
public class SimilarVideoUserOrder {
//    private Map<String, Long> videoIdUserCache = new HashMap<>();
    // 最终排序用的用户推荐视频集合结果, key = 推荐视频ID
    private Map<String, SimilarVideoUser> orderVideoList = new HashMap<>();
    // 计算多个推荐视频id,总得分, key = 推荐视频ID
    private Map<String, List<Double>> simiVideoDistanceMap = new HashMap<>();
    /**
     *
     * @param videoIdUserCache 推荐视频ID --> 用户ID 关系
     * @param list 计算后的: rc_video_t2 表数据
     */
    public SimilarVideoUserOrder(Map<String, Long> videoIdUserCache, List<Rc_video_t2> list){
//        this.videoIdUserCache = videoUser;
        Long userId = null;
        for(Rc_video_t2 e : list){
            this.addSimiVideoDistance(e.getSim_video_id(), e.getScore());
            userId = videoIdUserCache.get(e.getSrc_video_id());
            if(userId == null){
                throw new IllegalArgumentException("源视频未找到对应用户,src_video_id=" + e.getSrc_video_id());
            }
            this.addOrderVideo(e.getSim_video_id(), userId);
        }
    }
    public List<SimilarVideoUser> calculateOrder(){
        SimilarVideoUser similarVideoInfo = null;
        double totalScore = 0;
        for(Map.Entry<String, List<Double>> entry : this.simiVideoDistanceMap.entrySet()){
            similarVideoInfo = this.orderVideoList.get(entry.getKey());
            totalScore = this.getTotalScore(entry.getValue(), entry.getKey());
            similarVideoInfo.setScore(totalScore);
        }
        List<SimilarVideoUser> resultList = new ArrayList<>(128);
        for(SimilarVideoUser e : this.orderVideoList.values()){
            resultList.add(e);
        }
        Collections.sort(resultList);
        return resultList;
    }
    private double getTotalScore(List<Double> list, String simiVideoId){
        if(StringUtils.isEmptyList(list)){
            return 0;
        }
        double total = 0;
        for(double d : list){
            total += d;
        }
        return total;
    }
    private void addSimiVideoDistance(String videoId, double score){
        List<Double> v = this.simiVideoDistanceMap.get(videoId);
        if(v == null){
            v = new ArrayList<>(16);
            this.simiVideoDistanceMap.put(videoId, v);
        }
        v.add(score);
    }
    private void addOrderVideo(String simiVideoId, long userId){
        SimilarVideoUser v = this.orderVideoList.get(simiVideoId);
        if(v == null){
            v = new SimilarVideoUser(userId, simiVideoId);
            this.orderVideoList.put(simiVideoId, v);
        }
        v.increase();
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java
New file
@@ -0,0 +1,19 @@
package com.iplatform.recvideo;
import com.iplatform.gather.GatherScheduler;
import com.walker.store.support.EmptyDatabaseStore;
public abstract class VideoScheduler extends GatherScheduler {
    protected static final String SUCCESS = "ok";
    public VideoScheduler(int id, String name){
        super(id, name, new EmptyDatabaseStore());
    }
    protected void terminateGatherScheduler(int schedulerId) {
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java
New file
@@ -0,0 +1,15 @@
package com.iplatform.recvideo.scheduler;
public class VideoSearchMeta {
    private String batchId;
    public String getBatchId() {
        return batchId;
    }
    public VideoSearchMeta(String batchId){
        this.batchId = batchId;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java
New file
@@ -0,0 +1,97 @@
package com.iplatform.recvideo.scheduler;
import com.iplatform.core.BeanContextAware;
import com.iplatform.gather.LocalAddress;
import com.iplatform.recvideo.VideoScheduler;
import com.walker.store.AbstractStore;
import com.walker.store.task.GatherTask;
/**
 * 视频信息存储、搜索相似度处理的调度任务。<p></p>
 * 该任务会持续运行:
 * <pre>
 *     1)
 * </pre>
 * @author 时克英
 * @date 2022-09-26
 */
public class VideoSearchScheduler extends VideoScheduler {
    private VideoSearchTask videoSearchTask = null;
    private VideoSearchMeta currentVideoSearchMeta = null;
    //
    private int failedCount = 0;
    public VideoSearchScheduler(int id, String name){
        super(id, name);
    }
    @Override
    protected GatherTask providerTask(AbstractStore store) {
        LocalAddress localAddress = BeanContextAware.getBeanByType(LocalAddress.class);
        VideoSearchTask task = new VideoSearchTask("视频相似度检索任务", localAddress);
        return task;
    }
//    @Override
//    protected Object[] getRunParameters(Object previousInvokeResult) {}
    @Override
    protected Object onProcess(Object[] inputParams) throws Exception {
        if(this.videoSearchTask == null){
            logger.debug("调度任务初始化,需要先查询任务状态表");
            this.videoSearchTask = (VideoSearchTask)this.providerTask(this.getStore());
            Object[] params = new Object[]{this.videoSearchTask.getSql(), new Object[]{}};
            this.currentVideoSearchMeta = (VideoSearchMeta)this.videoSearchTask.run(null, null, null, params);
            if(this.currentVideoSearchMeta == null){
                return null;
            }
            logger.debug("开始启动批次查询:" + this.currentVideoSearchMeta.getBatchId());
        }
        //
        this.videoSearchTask.checkExecutor(this.currentVideoSearchMeta);
        //
        int result = this.videoSearchTask.executeOneSearch();
        if(result == 0){
            return SUCCESS;
        }
        if(result == -1){
            this.failedCount ++;
            if(this.failedCount > 3){
                // 如果报错超过多次,返回空,让线程休眠一下
                this.failedCount = 0;
                return null;
            } else {
                return SUCCESS;
            }
        }
        if(result == 1){
            // 完成,准备查找下一个批次记录
            logger.info("完成批次,准备查找下一个批次记录。batchId = " + this.currentVideoSearchMeta.getBatchId());
            this.clearStatus();
            return SUCCESS;
        }
        return null;
//        if(this.gatherTask == null){
//            this.gatherTask = this.providerTask(this.store);
//            if(this.gatherTask == null){
//                throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName());
//            }
//        }
//        // srcName, createTableSQL, parameter, params[]
//        return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]);
    }
    private void clearStatus(){
        if(this.videoSearchTask != null){
            this.videoSearchTask = null;
        }
        this.currentVideoSearchMeta = null;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java
New file
@@ -0,0 +1,94 @@
package com.iplatform.recvideo.scheduler;
import com.iplatform.core.BeanContextAware;
import com.iplatform.recvideo.config.VideoSimilarProperties;
import com.iplatform.recvideo.service.VideoExecutorServiceImpl;
import com.iplatform.recvideo.support.DefaultSimilarExecutor;
import com.walker.connector.Address;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.jdbc.JdbcInspector;
import com.walker.store.task.GenericGatherTask;
import org.springframework.web.client.RestTemplate;
import java.util.List;
import java.util.Map;
/**
 * 视频相似度查询以及写入结果执行任务实现。
 * <pre>
 *     1.采集源头为读取数据库最新批次信息。
 *     2.根据批次结果,扫描本机批次目录中所有视频图片,检索相似度并计算结果。
 * </pre>
 * @author 时克英
 * @date 2022-09-26
 */
public class VideoSearchTask extends GenericGatherTask {
    private DefaultSimilarExecutor similarExecutor = null;
    private VideoSimilarProperties videoSimilarProperties;
    private VideoExecutorServiceImpl videoExecutorService;
    private RestTemplate restTemplate;
    private String sql = "select * from rc_task_status where task_type='video_load' and status='0'";
    public VideoSearchTask(String name, Address address){
        super(name, address);
        this.setDatabaseType(JdbcInspector.getInstance().getPrimaryDatabaseType());
        this.videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
    }
    @Override
    protected List<Object> transferResultData(List<Object> resultList) {
        return resultList;
    }
    /**
     * 执行具体任务处理,主要由相似度执行器执行其<code>execute</code>方法。
     * @param srcName
     * @param createTableSQL
     * @param parameter
     * @param data 该参数是从表: rc_task_status 中查询出来的可用批次记录,若存在多个就抽取一个处理。
     * @return
     */
    @Override
    protected Object execute(String srcName, String createTableSQL, Object parameter, List<Object> data) {
        if(StringUtils.isEmptyList(data)){
           logger.warn("数据库中检索出可处理: rc_task_status 记录,但集合为空");
           return null;
        }
        Map<String, Object> rcTaskStatus = (Map<String, Object>)data.get(0);
        String lastValue = rcTaskStatus.get("last_value").toString();
        return new VideoSearchMeta(lastValue);
    }
    public String getSql(){
        return this.sql;
    }
    public void checkExecutor(VideoSearchMeta videoSearchMeta){
        if(this.similarExecutor != null){
            logger.debug("DefaultSimilarExecutor 已经设置过,不再重复设置");
            return;
        }
        DefaultSimilarExecutor defaultSimilarExecutor = new DefaultSimilarExecutor();
        defaultSimilarExecutor.setRemoteUrl(this.videoSimilarProperties.getAiService());
        defaultSimilarExecutor.setVideoExecutorService(BeanContextAware.getBeanByType(VideoExecutorServiceImpl.class));
        defaultSimilarExecutor.setRestTemplate(BeanContextAware.getBeanByType(RestTemplate.class));
        defaultSimilarExecutor.startup(this.videoSimilarProperties.getDataFolder()
                , videoSearchMeta.getBatchId(), this.videoSimilarProperties.getTopN(), true);
        this.similarExecutor = defaultSimilarExecutor;
    }
    /**
     * 调用一次结果查询及写入。
     * @return -1 失败出现异常, 0 成功执行一次(继续下一次), 1 批次完成
     * @throws Exception
     */
    public int executeOneSearch() throws Exception{
        if(this.similarExecutor == null){
            throw new IllegalStateException("similarExecutor is required!");
        }
        return this.similarExecutor.execute();
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java
@@ -2,6 +2,9 @@
import com.iplatform.model.po.Rc_video_t1;
import com.iplatform.model.po.Rc_video_t2;
import com.iplatform.model.po.Rc_video_user;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.jdbc.service.BaseServiceImpl;
import org.springframework.stereotype.Service;
@@ -14,6 +17,10 @@
    private static final String SQL_CHECK_VIDEO_STATUS = "select * from milvus_video_status where id=?";
    private static final String SQL_CLEAR_VIDEO_T1 = "delete from rc_video_t1 where src_img=?";
    private static final String SQL_CLEAR_VIDEO_T2 = "delete from rc_video_t2 where src_video_id=?";
    private static final String SQL_GET_BATCH_VIDEO = "select user_id, src_video_id from rc_video_batch where batch_id=?";
    private static final String SQL_UPDATE_TASK_STATUS_LOAD = "update rc_task_status set status='1', end_time=? where last_value=? and status='0'";
    /**
     * 写入视频相似度第一级临时数据,每个图像包含多个相似视频记录。
@@ -37,6 +44,34 @@
    }
    /**
     * 写入用户推荐视频记录集合,并更新批次任务状态为(已完成)
     * @param videoUserList
     * @param batchId
     */
    public void execBatchInsertVideoUser(List<Rc_video_user> videoUserList, String batchId){
        this.insert(videoUserList);
        Object[] param = new Object[2];
        param[0] = Long.parseLong(DateUtils.getDateTimeSecondForShow());
        param[1] = Long.parseLong(batchId);
        this.execute(SQL_UPDATE_TASK_STATUS_LOAD, param);
    }
    /**
     * 返回一个批次用户对应视频记录集合,用于最后更新用户推荐视频数据。
     * @param batchId
     * @return
     * @date 2022-09-26
     */
    public List<Map<String, Object>> queryBatchUserVideoList(String batchId){
        return this.select(SQL_GET_BATCH_VIDEO, new Object[]{batchId});
    }
    public List<Rc_video_t2> queryVideoT_2List(String batchId){
        return this.select(new Rc_video_t2(), "where batch_id=?", new Object[]{batchId});
    }
    /**
     * 根据原始视频ID返回相似记录集合。
     * @param srcVideoId
     * @return
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java
@@ -2,20 +2,26 @@
import com.iplatform.model.po.Rc_video_t1;
import com.iplatform.model.po.Rc_video_t2;
import com.iplatform.model.po.Rc_video_user;
import com.iplatform.recvideo.Constants;
import com.iplatform.recvideo.SimilarExecutor;
import com.iplatform.recvideo.SimilarVideoInfo;
import com.iplatform.recvideo.SimilarVideoOrder;
import com.iplatform.recvideo.SimilarVideoUser;
import com.iplatform.recvideo.SimilarVideoUserOrder;
import com.iplatform.recvideo.VideoFolderInfo;
import com.iplatform.recvideo.service.VideoExecutorServiceImpl;
import com.iplatform.recvideo.util.PythonInvokeUtils;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
import com.walker.infrastructure.utils.StringUtils;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class DefaultSimilarExecutor extends SimilarExecutor {
@@ -70,7 +76,7 @@
    }
    @Override
    protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo) {
    protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId) {
        logger.debug("正在写入'Rc_video_t2',原始视频:" + videoFolderInfo.getVideoId());
        List<Rc_video_t1> list = this.videoExecutorService.queryVideoT_1List(videoFolderInfo.getVideoId());
        if(StringUtils.isEmptyList(list)){
@@ -79,7 +85,7 @@
        }
        SimilarVideoOrder similarVideoOrder = new SimilarVideoOrder(list);
        List<SimilarVideoInfo> similarVideoInfoList = similarVideoOrder.calculateOrder();
        List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList);
        List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList, batchId);
        if(video2List == null){
            logger.warn("writeRcVideoT2(): similarVideoInfoList为空,不能执行, srcVideoId = " + videoFolderInfo.getVideoId());
            return;
@@ -90,9 +96,67 @@
    @Override
    protected void writeRcVideoUser(String batchId, List<String> recVideoIdList) {
        logger.info("正在写入一次用户推荐视频数据, batchId = " + batchId);
        List<Map<String, Object>> batchUserVideoList = this.videoExecutorService.queryBatchUserVideoList(batchId);
        if(StringUtils.isEmptyList(batchUserVideoList)){
            logger.error("未找到批次用户视频记录,无法更新用户推荐视频数据,batchId = " + batchId);
            return;
        }
        List<Rc_video_t2> video2List = this.videoExecutorService.queryVideoT_2List(batchId);
        if(StringUtils.isEmptyList(video2List)){
            logger.warn("从表'rc_video_t2'未加载到批次视频结果记录,无法继续写入用户推荐视频'rc_video_user'");
            return;
        }
        // 原始视频ID与用户关系
        Map<String, Long> srcVideoIdUserCache = new HashMap<>();
        for(Map<String, Object> m : batchUserVideoList){
            srcVideoIdUserCache.put(m.get("src_video_id").toString(), Long.parseLong(m.get("user_id").toString()));
        }
        // 推荐视频ID与用户关系
        Map<String, Long> videoIdUserCache = new HashMap<>();
        Long userId = null;
        for(Rc_video_t2 e : video2List){
            userId = srcVideoIdUserCache.get(e.getSrc_video_id());
            if(userId == null){
                throw new IllegalArgumentException("推荐视频的原始视频未找到对应用户,src_video = " + e.getSrc_video_id() + ", rec_video = " + e.getSim_video_id());
            }
            videoIdUserCache.put(e.getSim_video_id(), userId);
        }
        SimilarVideoUserOrder similarVideoUserOrder = new SimilarVideoUserOrder(videoIdUserCache, video2List);
        List<SimilarVideoUser> similarVideoUserList = similarVideoUserOrder.calculateOrder();
        List<Rc_video_user> videoUserList = this.toRcVideoUserList(similarVideoUserList);
        if(videoUserList == null){
            logger.warn("writeRcVideoUser(): similarVideoUserList,不能执行, batchId = " + batchId);
            return;
        }
        // 用户推荐视频记录,需要更新,如果不存在才写入。
        // 注意:这里暂时不更新,全部写入,这样用户推荐表中会存在较多重复数据,后续要添加定时任务清除重复数据即可。
        this.videoExecutorService.execBatchInsertVideoUser(videoUserList, batchId);
        logger.debug("批次任务状态已更新成功! " + batchId);
    }
    private List<Rc_video_t2> toRcVideoT2List(String srcVideoId, List<SimilarVideoInfo> similarVideoInfoList){
    private List<Rc_video_user> toRcVideoUserList(List<SimilarVideoUser> similarVideoUserList){
        if(StringUtils.isEmptyList(similarVideoUserList)){
            return null;
        }
        List<Rc_video_user> resultList = new ArrayList<>(similarVideoUserList.size());
        Rc_video_user rc_video_t2 = null;
        for(SimilarVideoUser e : similarVideoUserList){
            rc_video_t2 = new Rc_video_user(NumberGenerator.getLongSequenceNumber());
            rc_video_t2.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
            rc_video_t2.setUser_id(e.getUserId());
            rc_video_t2.setVideo_id(e.getRecommendVideoId());
            rc_video_t2.setScore(e.getScore());
            resultList.add(rc_video_t2);
        }
        return resultList;
    }
    private List<Rc_video_t2> toRcVideoT2List(String srcVideoId
            , List<SimilarVideoInfo> similarVideoInfoList, String batchId){
        if(StringUtils.isEmptyList(similarVideoInfoList)){
            return null;
        }
@@ -103,6 +167,7 @@
            rc_video_t2.setSrc_video_id(srcVideoId);
            rc_video_t2.setSim_video_id(e.getId());
            rc_video_t2.setScore(e.getScore());
            rc_video_t2.setBatch_id(batchId);
            resultList.add(rc_video_t2);
        }
        return resultList;