From fd03e31f173ad9c52b15a30a9127e2b6a468538d Mon Sep 17 00:00:00 2001
From: shikeying <shikeying@163.com>
Date: 星期一, 26 九月 2022 18:50:33 +0800
Subject: [PATCH] 视频相似度分析2

---
 recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java                 |   19 +
 recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java   |   97 +++++++++
 recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java   |   71 ++++++
 recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java          |   20 +
 recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java                  |   10 
 recommend-video/doc/table.SQL                                                              |   10 
 recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java                 |   68 ++++++
 recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java        |   94 +++++++++
 recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java               |   24 ++
 recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java |   35 +++
 recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java                   |   19 +
 recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java            |   90 +++++++++
 recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java        |   15 +
 13 files changed, 553 insertions(+), 19 deletions(-)

diff --git a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java
index f091da4..71a3eb3 100644
--- a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java
+++ b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2.java
@@ -37,6 +37,10 @@
     @JsonIgnore
     protected boolean isset_score = false;
 
+    private String batch_id = null;
+    @JsonIgnore
+    protected boolean isset_batch_id = false;
+
     /**
      * 榛樿鏋勯�犲嚱鏁�
      */
@@ -114,6 +118,17 @@
         return this.score == null;
     }
 
+    public String getBatch_id(){return this.batch_id;}
+
+    public void setBatch_id(String batch_id){
+        this.batch_id = batch_id;
+        this.isset_batch_id = true;
+    }
+
+    @JsonIgnore
+    public boolean isEmptyBatch_id(){return this.batch_id == null || this.batch_id.length() == 0;}
+
+
     /**
      * 閲嶅啓 toString() 鏂规硶
      */
@@ -124,6 +139,7 @@
                 .append("src_video_id=").append(this.src_video_id)
                 .append("sim_video_id=").append(this.sim_video_id)
                 .append("score=").append(this.score)
+                .append("batch_id=").append(this.batch_id)
                 .toString();
     }
 
@@ -150,6 +166,9 @@
         if (this.isset_score) {
             rc_video_t2.setScore(this.getScore());
         }
+        if(this.isset_batch_id){
+            rc_video_t2.setBatch_id(this.getBatch_id());
+        }
         return rc_video_t2;
     }
 }
diff --git a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java
index 1411a5c..2489d77 100644
--- a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java
+++ b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_t2_mapper.java
@@ -7,8 +7,6 @@
 import com.walker.jdbc.sqlgen.InsertBuilder;
 import com.walker.jdbc.sqlgen.SelectBuilder;
 import com.walker.jdbc.sqlgen.UpdateBuilder;
-import com.walker.jdbc.util.StringUtils;
-
 import org.springframework.jdbc.core.RowMapper;
 
 import java.sql.ResultSet;
@@ -31,6 +29,7 @@
     public static final String SRC_VIDEO_ID = "src_video_id";
     public static final String SIM_VIDEO_ID = "sim_video_id";
     public static final String SCORE = "score";
+    public static final String BATCH_ID = "batch_id";
 
     /**
      * 榛樿鏋勯�犲嚱鏁�
@@ -52,6 +51,9 @@
         }
         if (rc_video_t2.isset_score) {
             this.setScore(rc_video_t2.getScore());
+        }
+        if(rc_video_t2.isset_batch_id){
+            this.setBatch_id(rc_video_t2.getBatch_id());
         }
         // 鍘绘帀锛�2022-09-07
         // this.setDatabaseName_(rc_video_t2.getDatabaseName_());
@@ -99,6 +101,7 @@
         ib.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
         ib.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
         ib.set(SCORE, this.getScore(), this.isset_score);
+        ib.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
         return ib.genMapSql();
     }
 
@@ -111,6 +114,7 @@
         ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
         ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
         ub.set(SCORE, this.getScore(), this.isset_score);
+        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
         ub.where(this.getPkName_(), this.getPkValue_());
         return ub.genMapSql();
     }
@@ -124,7 +128,7 @@
         ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
         ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
         ub.set(SCORE, this.getScore(), this.isset_score);
-
+        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
         return ub.genMapSql(where, parameters);
     }
 
@@ -137,7 +141,7 @@
         ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id);
         ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id);
         ub.set(SCORE, this.getScore(), this.isset_score);
-
+        ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id);
         return ub.genArraySql(where, parameters);
     }
 
@@ -185,7 +189,7 @@
      */
     @Override
     public SqlAndParameters<Map<String, Object>> getSelectSql_(String where, Map<String, Object> parameters) {
-        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters);
+        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters);
     }
 
     /**
@@ -193,7 +197,7 @@
      */
     @Override
     public SqlAndParameters<Object[]> getSelectSql_(String where, Object[] parameters) {
-        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters);
+        return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters);
     }
 
     /**
@@ -238,6 +242,10 @@
         if (columnIndex > 0) {
             rc_video_t2.setSim_video_id(rs.getString(columnIndex));
         }
+        columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.BATCH_ID);
+        if (columnIndex > 0) {
+            rc_video_t2.setBatch_id(rs.getString(columnIndex));
+        }
         columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.SCORE);
         if (columnIndex > 0) {
             if (rs.getBigDecimal(columnIndex) == null) {
diff --git a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java
index f49c32f..1f37b0e 100644
--- a/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java
+++ b/recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_video_user.java
@@ -20,11 +20,15 @@
     public static final Rc_video_user ROW_MAPPER = new Rc_video_user();
 
     // 涓婚敭
+    private Long id;
+    @JsonIgnore
+    protected boolean isset_id = false;
+
+    // 灞炴�у垪琛�
     private Long user_id = null;
     @JsonIgnore
     protected boolean isset_user_id = false;
 
-    // 灞炴�у垪琛�
     private String video_id = null;
     @JsonIgnore
     protected boolean isset_video_id = false;
@@ -46,8 +50,8 @@
     /**
      * 鏍规嵁涓婚敭鏋勯�犲璞�
      */
-    public Rc_video_user(Long user_id) {
-        this.setUser_id(user_id);
+    public Rc_video_user(Long id) {
+        this.setId(id);
     }
 
     /**
@@ -55,7 +59,19 @@
      */
     @Override
     public void setPkValue(Object value) {
-        this.setUser_id((Long) value);
+        this.setId((Long) value);
+    }
+
+    public Long getId(){return this.id;}
+
+    public void setId(Long id){
+        this.id = id;
+        this.isset_id = true;
+    }
+
+    @JsonIgnore
+    public boolean isEmptyId() {
+        return this.id == null;
     }
 
     public Long getUser_id() {
diff --git a/recommend-video/doc/table.SQL b/recommend-video/doc/table.SQL
index 2283614..5edc014 100644
--- a/recommend-video/doc/table.SQL
+++ b/recommend-video/doc/table.SQL
@@ -6,4 +6,12 @@
     ADD INDEX inx_src_vid (src_video_id) USING BTREE ;
 
 ALTER TABLE rc_video_t2
-    ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ;
\ No newline at end of file
+    ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ;
+ALTER TABLE rc_video_t2
+    ADD INDEX inx_t2_src_bid (batch_id) USING BTREE ;
+
+-- ALTER TABLE rc_video_batch
+--     ADD INDEX inx_vb_vid (src_video_id) USING BTREE ;
+
+ALTER TABLE rc_video_batch
+    ADD INDEX inx_vb_bid (batch_id) USING BTREE ;
\ No newline at end of file
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java
index 1318e53..51b6220 100644
--- a/recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarExecutor.java
@@ -139,14 +139,14 @@
         }
 
         try {
-            this.processOneSearchAndWrite();
+            this.processOneSearchAndWrite(this.batchId);
             return 0;
         } catch (Exception e) {
             throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e);
         }
     }
 
-    private void processOneSearchAndWrite() throws Exception{
+    private void processOneSearchAndWrite(String batchId) throws Exception{
         if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){
             throw new IllegalArgumentException("currentVideoFolderIndex 瓒婄晫: " + this.currentVideoFolderIndex);
         }
@@ -170,7 +170,7 @@
         // 姣忎釜瑙嗛鐨勬渶鍚庝竴寮犲浘鐗�
         if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){
             try {
-                this.writeRcVideoT2(currentVideoFolderInfo);
+                this.writeRcVideoT2(currentVideoFolderInfo, batchId);
             } catch (Exception ex){
                 throw new Exception("writeRcVideoT2()鎵ц閿欒:" + ex.getMessage(), ex);
             }
@@ -239,12 +239,12 @@
      * 鍒嗘瀽琛�'rc_video_t1'锛屽苟鎶婄粰瀹氳棰戠浉浼艰褰曞啓鍏ョ浜屼釜涓存椂琛�'rc_video_t2'
      * @param videoFolderInfo
      */
-    protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo);
+    protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId);
 
     /**
      * 鍒嗘瀽缁欏畾鎵规鎵�鏈夎棰戠敤鎴锋帹鑽愮殑瑙嗛淇℃伅锛屽苟鍐欏叆琛�:'rc_video_user'
      * @param batchId
-     * @param recVideoIdList 鎺ㄨ崘瑙嗛id闆嗗悎
+     * @param recVideoIdList 鏈壒娆″鐞嗗師濮嬭棰慽d闆嗗悎
      */
     protected abstract void writeRcVideoUser(String batchId, List<String> recVideoIdList);
 }
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java
new file mode 100644
index 0000000..0d77dbd
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUser.java
@@ -0,0 +1,68 @@
+package com.iplatform.recvideo;
+
+public class SimilarVideoUser implements Comparable<SimilarVideoUser>{
+
+    private long userId;
+    private String recommendVideoId;
+
+    private double score = 0;
+
+    // 杩欎釜鐩镐技瑙嗛锛屽搴旂殑婧愯棰戝浘鍍忓紶鏁�
+    private int count = 0;
+
+    public SimilarVideoUser(long userId, String recommendVideoId){
+        this.userId = userId;
+        this.recommendVideoId = recommendVideoId;
+    }
+
+    public void increase(){
+        this.count ++;
+    }
+
+    public void setScore(double totalScore){
+        this.score = totalScore;
+    }
+
+    public long getUserId() {
+        return userId;
+    }
+
+    public String getRecommendVideoId() {
+        return recommendVideoId;
+    }
+
+    public double getScore() {
+        return score;
+    }
+
+    @Override
+    public String toString(){
+        return new StringBuilder("[recVideoId=").append(this.recommendVideoId)
+                .append(", score=").append(this.score)
+                .append("]").toString();
+    }
+
+    @Override
+    public int hashCode(){
+        return this.recommendVideoId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj){
+        if(obj == null){
+            return false;
+        }
+        if(obj instanceof SimilarVideoUser){
+            SimilarVideoUser e = (SimilarVideoUser)obj;
+            if(e.recommendVideoId.equals(this.recommendVideoId)){
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int compareTo(SimilarVideoUser o) {
+        return o.count - this.count;
+    }
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java
new file mode 100644
index 0000000..6375224
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/SimilarVideoUserOrder.java
@@ -0,0 +1,90 @@
+package com.iplatform.recvideo;
+
+import com.iplatform.model.po.Rc_video_t2;
+import com.walker.infrastructure.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 瀵圭敤鎴锋帹鑽愯棰戣繘琛岃绠楁帓搴忋��
+ * @author 鏃跺厠鑻�
+ * @date 2022-09-26
+ */
+public class SimilarVideoUserOrder {
+
+//    private Map<String, Long> videoIdUserCache = new HashMap<>();
+
+    // 鏈�缁堟帓搴忕敤鐨勭敤鎴锋帹鑽愯棰戦泦鍚堢粨鏋�, key = 鎺ㄨ崘瑙嗛ID
+    private Map<String, SimilarVideoUser> orderVideoList = new HashMap<>();
+
+    // 璁$畻澶氫釜鎺ㄨ崘瑙嗛id锛屾�诲緱鍒�, key = 鎺ㄨ崘瑙嗛ID
+    private Map<String, List<Double>> simiVideoDistanceMap = new HashMap<>();
+
+    /**
+     *
+     * @param videoIdUserCache 鎺ㄨ崘瑙嗛ID --> 鐢ㄦ埛ID 鍏崇郴
+     * @param list 璁$畻鍚庣殑: rc_video_t2 琛ㄦ暟鎹�
+     */
+    public SimilarVideoUserOrder(Map<String, Long> videoIdUserCache, List<Rc_video_t2> list){
+//        this.videoIdUserCache = videoUser;
+        Long userId = null;
+        for(Rc_video_t2 e : list){
+            this.addSimiVideoDistance(e.getSim_video_id(), e.getScore());
+            userId = videoIdUserCache.get(e.getSrc_video_id());
+            if(userId == null){
+                throw new IllegalArgumentException("婧愯棰戞湭鎵惧埌瀵瑰簲鐢ㄦ埛锛宻rc_video_id=" + e.getSrc_video_id());
+            }
+            this.addOrderVideo(e.getSim_video_id(), userId);
+        }
+    }
+
+    public List<SimilarVideoUser> calculateOrder(){
+        SimilarVideoUser similarVideoInfo = null;
+        double totalScore = 0;
+        for(Map.Entry<String, List<Double>> entry : this.simiVideoDistanceMap.entrySet()){
+            similarVideoInfo = this.orderVideoList.get(entry.getKey());
+            totalScore = this.getTotalScore(entry.getValue(), entry.getKey());
+            similarVideoInfo.setScore(totalScore);
+        }
+
+        List<SimilarVideoUser> resultList = new ArrayList<>(128);
+        for(SimilarVideoUser e : this.orderVideoList.values()){
+            resultList.add(e);
+        }
+        Collections.sort(resultList);
+        return resultList;
+    }
+
+    private double getTotalScore(List<Double> list, String simiVideoId){
+        if(StringUtils.isEmptyList(list)){
+            return 0;
+        }
+        double total = 0;
+        for(double d : list){
+            total += d;
+        }
+        return total;
+    }
+
+    private void addSimiVideoDistance(String videoId, double score){
+        List<Double> v = this.simiVideoDistanceMap.get(videoId);
+        if(v == null){
+            v = new ArrayList<>(16);
+            this.simiVideoDistanceMap.put(videoId, v);
+        }
+        v.add(score);
+    }
+
+    private void addOrderVideo(String simiVideoId, long userId){
+        SimilarVideoUser v = this.orderVideoList.get(simiVideoId);
+        if(v == null){
+            v = new SimilarVideoUser(userId, simiVideoId);
+            this.orderVideoList.put(simiVideoId, v);
+        }
+        v.increase();
+    }
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java b/recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java
new file mode 100644
index 0000000..8eeb60a
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/VideoScheduler.java
@@ -0,0 +1,19 @@
+package com.iplatform.recvideo;
+
+import com.iplatform.gather.GatherScheduler;
+import com.walker.store.support.EmptyDatabaseStore;
+
+public abstract class VideoScheduler extends GatherScheduler {
+
+    protected static final String SUCCESS = "ok";
+
+    public VideoScheduler(int id, String name){
+        super(id, name, new EmptyDatabaseStore());
+    }
+
+    protected void terminateGatherScheduler(int schedulerId) {
+
+    }
+
+
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java
new file mode 100644
index 0000000..8bd14e8
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchMeta.java
@@ -0,0 +1,15 @@
+package com.iplatform.recvideo.scheduler;
+
+public class VideoSearchMeta {
+
+    private String batchId;
+
+    public String getBatchId() {
+        return batchId;
+    }
+
+    public VideoSearchMeta(String batchId){
+        this.batchId = batchId;
+    }
+
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java
new file mode 100644
index 0000000..27a5a78
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchScheduler.java
@@ -0,0 +1,97 @@
+package com.iplatform.recvideo.scheduler;
+
+import com.iplatform.core.BeanContextAware;
+import com.iplatform.gather.LocalAddress;
+import com.iplatform.recvideo.VideoScheduler;
+import com.walker.store.AbstractStore;
+import com.walker.store.task.GatherTask;
+
+/**
+ * 瑙嗛淇℃伅瀛樺偍銆佹悳绱㈢浉浼煎害澶勭悊鐨勮皟搴︿换鍔°��<p></p>
+ * 璇ヤ换鍔′細鎸佺画杩愯:
+ * <pre>
+ *     1)
+ * </pre>
+ * @author 鏃跺厠鑻�
+ * @date 2022-09-26
+ */
+public class VideoSearchScheduler extends VideoScheduler {
+
+    private VideoSearchTask videoSearchTask = null;
+    private VideoSearchMeta currentVideoSearchMeta = null;
+
+    //
+    private int failedCount = 0;
+
+    public VideoSearchScheduler(int id, String name){
+        super(id, name);
+    }
+
+    @Override
+    protected GatherTask providerTask(AbstractStore store) {
+        LocalAddress localAddress = BeanContextAware.getBeanByType(LocalAddress.class);
+        VideoSearchTask task = new VideoSearchTask("瑙嗛鐩镐技搴︽绱换鍔�", localAddress);
+        return task;
+    }
+
+//    @Override
+//    protected Object[] getRunParameters(Object previousInvokeResult) {}
+
+    @Override
+    protected Object onProcess(Object[] inputParams) throws Exception {
+        if(this.videoSearchTask == null){
+            logger.debug("璋冨害浠诲姟鍒濆鍖栵紝闇�瑕佸厛鏌ヨ浠诲姟鐘舵�佽〃");
+            this.videoSearchTask = (VideoSearchTask)this.providerTask(this.getStore());
+            Object[] params = new Object[]{this.videoSearchTask.getSql(), new Object[]{}};
+            this.currentVideoSearchMeta = (VideoSearchMeta)this.videoSearchTask.run(null, null, null, params);
+            if(this.currentVideoSearchMeta == null){
+                return null;
+            }
+            logger.debug("寮�濮嬪惎鍔ㄦ壒娆℃煡璇�:" + this.currentVideoSearchMeta.getBatchId());
+        }
+
+        //
+        this.videoSearchTask.checkExecutor(this.currentVideoSearchMeta);
+        //
+        int result = this.videoSearchTask.executeOneSearch();
+
+        if(result == 0){
+            return SUCCESS;
+        }
+
+        if(result == -1){
+            this.failedCount ++;
+            if(this.failedCount > 3){
+                // 濡傛灉鎶ラ敊瓒呰繃澶氭锛岃繑鍥炵┖锛岃绾跨▼浼戠湢涓�涓�
+                this.failedCount = 0;
+                return null;
+            } else {
+                return SUCCESS;
+            }
+        }
+
+        if(result == 1){
+            // 瀹屾垚锛屽噯澶囨煡鎵句笅涓�涓壒娆¤褰�
+            logger.info("瀹屾垚鎵规锛屽噯澶囨煡鎵句笅涓�涓壒娆¤褰曘�俠atchId = " + this.currentVideoSearchMeta.getBatchId());
+            this.clearStatus();
+            return SUCCESS;
+        }
+
+        return null;
+//        if(this.gatherTask == null){
+//            this.gatherTask = this.providerTask(this.store);
+//            if(this.gatherTask == null){
+//                throw new IllegalArgumentException("鏈彁渚涗换鍔″璞★紝璋冨害鏃犳硶鎵ц锛�" + this.getName());
+//            }
+//        }
+//        // srcName, createTableSQL, parameter, params[]
+//        return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]);
+    }
+
+    private void clearStatus(){
+        if(this.videoSearchTask != null){
+            this.videoSearchTask = null;
+        }
+        this.currentVideoSearchMeta = null;
+    }
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java
new file mode 100644
index 0000000..2e89235
--- /dev/null
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoSearchTask.java
@@ -0,0 +1,94 @@
+package com.iplatform.recvideo.scheduler;
+
+import com.iplatform.core.BeanContextAware;
+import com.iplatform.recvideo.config.VideoSimilarProperties;
+import com.iplatform.recvideo.service.VideoExecutorServiceImpl;
+import com.iplatform.recvideo.support.DefaultSimilarExecutor;
+import com.walker.connector.Address;
+import com.walker.infrastructure.utils.StringUtils;
+import com.walker.jdbc.JdbcInspector;
+import com.walker.store.task.GenericGatherTask;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 瑙嗛鐩镐技搴︽煡璇互鍙婂啓鍏ョ粨鏋滄墽琛屼换鍔″疄鐜般��
+ * <pre>
+ *     1.閲囬泦婧愬ご涓鸿鍙栨暟鎹簱鏈�鏂版壒娆′俊鎭��
+ *     2.鏍规嵁鎵规缁撴灉锛屾壂鎻忔湰鏈烘壒娆$洰褰曚腑鎵�鏈夎棰戝浘鐗囷紝妫�绱㈢浉浼煎害骞惰绠楃粨鏋溿��
+ * </pre>
+ * @author 鏃跺厠鑻�
+ * @date 2022-09-26
+ */
+public class VideoSearchTask extends GenericGatherTask {
+
+    private DefaultSimilarExecutor similarExecutor = null;
+
+    private VideoSimilarProperties videoSimilarProperties;
+    private VideoExecutorServiceImpl videoExecutorService;
+    private RestTemplate restTemplate;
+
+    private String sql = "select * from rc_task_status where task_type='video_load' and status='0'";
+
+    public VideoSearchTask(String name, Address address){
+        super(name, address);
+        this.setDatabaseType(JdbcInspector.getInstance().getPrimaryDatabaseType());
+        this.videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
+    }
+
+    @Override
+    protected List<Object> transferResultData(List<Object> resultList) {
+        return resultList;
+    }
+
+    /**
+     * 鎵ц鍏蜂綋浠诲姟澶勭悊锛屼富瑕佺敱鐩镐技搴︽墽琛屽櫒鎵ц鍏�<code>execute</code>鏂规硶銆�
+     * @param srcName
+     * @param createTableSQL
+     * @param parameter
+     * @param data 璇ュ弬鏁版槸浠庤〃: rc_task_status 涓煡璇㈠嚭鏉ョ殑鍙敤鎵规璁板綍锛岃嫢瀛樺湪澶氫釜灏辨娊鍙栦竴涓鐞嗐��
+     * @return
+     */
+    @Override
+    protected Object execute(String srcName, String createTableSQL, Object parameter, List<Object> data) {
+        if(StringUtils.isEmptyList(data)){
+           logger.warn("鏁版嵁搴撲腑妫�绱㈠嚭鍙鐞�: rc_task_status 璁板綍锛屼絾闆嗗悎涓虹┖");
+           return null;
+        }
+        Map<String, Object> rcTaskStatus = (Map<String, Object>)data.get(0);
+        String lastValue = rcTaskStatus.get("last_value").toString();
+        return new VideoSearchMeta(lastValue);
+    }
+
+    public String getSql(){
+        return this.sql;
+    }
+
+    public void checkExecutor(VideoSearchMeta videoSearchMeta){
+        if(this.similarExecutor != null){
+            logger.debug("DefaultSimilarExecutor 宸茬粡璁剧疆杩囷紝涓嶅啀閲嶅璁剧疆");
+            return;
+        }
+        DefaultSimilarExecutor defaultSimilarExecutor = new DefaultSimilarExecutor();
+        defaultSimilarExecutor.setRemoteUrl(this.videoSimilarProperties.getAiService());
+        defaultSimilarExecutor.setVideoExecutorService(BeanContextAware.getBeanByType(VideoExecutorServiceImpl.class));
+        defaultSimilarExecutor.setRestTemplate(BeanContextAware.getBeanByType(RestTemplate.class));
+        defaultSimilarExecutor.startup(this.videoSimilarProperties.getDataFolder()
+                , videoSearchMeta.getBatchId(), this.videoSimilarProperties.getTopN(), true);
+        this.similarExecutor = defaultSimilarExecutor;
+    }
+
+    /**
+     * 璋冪敤涓�娆$粨鏋滄煡璇㈠強鍐欏叆銆�
+     * @return -1 澶辫触鍑虹幇寮傚父, 0 鎴愬姛鎵ц涓�娆�(缁х画涓嬩竴娆�), 1 鎵规瀹屾垚
+     * @throws Exception
+     */
+    public int executeOneSearch() throws Exception{
+        if(this.similarExecutor == null){
+            throw new IllegalStateException("similarExecutor is required!");
+        }
+        return this.similarExecutor.execute();
+    }
+}
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java b/recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java
index 9802597..6fe6174 100644
--- a/recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/service/VideoExecutorServiceImpl.java
@@ -2,6 +2,9 @@
 
 import com.iplatform.model.po.Rc_video_t1;
 import com.iplatform.model.po.Rc_video_t2;
+import com.iplatform.model.po.Rc_video_user;
+import com.walker.infrastructure.utils.DateUtils;
+import com.walker.infrastructure.utils.StringUtils;
 import com.walker.jdbc.service.BaseServiceImpl;
 import org.springframework.stereotype.Service;
 
@@ -14,6 +17,10 @@
     private static final String SQL_CHECK_VIDEO_STATUS = "select * from milvus_video_status where id=?";
     private static final String SQL_CLEAR_VIDEO_T1 = "delete from rc_video_t1 where src_img=?";
     private static final String SQL_CLEAR_VIDEO_T2 = "delete from rc_video_t2 where src_video_id=?";
+
+    private static final String SQL_GET_BATCH_VIDEO = "select user_id, src_video_id from rc_video_batch where batch_id=?";
+
+    private static final String SQL_UPDATE_TASK_STATUS_LOAD = "update rc_task_status set status='1', end_time=? where last_value=? and status='0'";
 
     /**
      * 鍐欏叆瑙嗛鐩镐技搴︾涓�绾т复鏃舵暟鎹紝姣忎釜鍥惧儚鍖呭惈澶氫釜鐩镐技瑙嗛璁板綍銆�
@@ -37,6 +44,34 @@
     }
 
     /**
+     * 鍐欏叆鐢ㄦ埛鎺ㄨ崘瑙嗛璁板綍闆嗗悎锛屽苟鏇存柊鎵规浠诲姟鐘舵�佷负(宸插畬鎴�)
+     * @param videoUserList
+     * @param batchId
+     */
+    public void execBatchInsertVideoUser(List<Rc_video_user> videoUserList, String batchId){
+        this.insert(videoUserList);
+        Object[] param = new Object[2];
+        param[0] = Long.parseLong(DateUtils.getDateTimeSecondForShow());
+        param[1] = Long.parseLong(batchId);
+        this.execute(SQL_UPDATE_TASK_STATUS_LOAD, param);
+    }
+
+
+    /**
+     * 杩斿洖涓�涓壒娆$敤鎴峰搴旇棰戣褰曢泦鍚堬紝鐢ㄤ簬鏈�鍚庢洿鏂扮敤鎴锋帹鑽愯棰戞暟鎹��
+     * @param batchId
+     * @return
+     * @date 2022-09-26
+     */
+    public List<Map<String, Object>> queryBatchUserVideoList(String batchId){
+        return this.select(SQL_GET_BATCH_VIDEO, new Object[]{batchId});
+    }
+
+    public List<Rc_video_t2> queryVideoT_2List(String batchId){
+        return this.select(new Rc_video_t2(), "where batch_id=?", new Object[]{batchId});
+    }
+
+    /**
      * 鏍规嵁鍘熷瑙嗛ID杩斿洖鐩镐技璁板綍闆嗗悎銆�
      * @param srcVideoId
      * @return
diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java
index ea9327b..ceec831 100644
--- a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultSimilarExecutor.java
@@ -2,20 +2,26 @@
 
 import com.iplatform.model.po.Rc_video_t1;
 import com.iplatform.model.po.Rc_video_t2;
+import com.iplatform.model.po.Rc_video_user;
 import com.iplatform.recvideo.Constants;
 import com.iplatform.recvideo.SimilarExecutor;
 import com.iplatform.recvideo.SimilarVideoInfo;
 import com.iplatform.recvideo.SimilarVideoOrder;
+import com.iplatform.recvideo.SimilarVideoUser;
+import com.iplatform.recvideo.SimilarVideoUserOrder;
 import com.iplatform.recvideo.VideoFolderInfo;
 import com.iplatform.recvideo.service.VideoExecutorServiceImpl;
 import com.iplatform.recvideo.util.PythonInvokeUtils;
+import com.walker.infrastructure.utils.DateUtils;
 import com.walker.infrastructure.utils.NumberGenerator;
 import com.walker.infrastructure.utils.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 public class DefaultSimilarExecutor extends SimilarExecutor {
 
@@ -70,7 +76,7 @@
     }
 
     @Override
-    protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo) {
+    protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId) {
         logger.debug("姝e湪鍐欏叆'Rc_video_t2',鍘熷瑙嗛:" + videoFolderInfo.getVideoId());
         List<Rc_video_t1> list = this.videoExecutorService.queryVideoT_1List(videoFolderInfo.getVideoId());
         if(StringUtils.isEmptyList(list)){
@@ -79,7 +85,7 @@
         }
         SimilarVideoOrder similarVideoOrder = new SimilarVideoOrder(list);
         List<SimilarVideoInfo> similarVideoInfoList = similarVideoOrder.calculateOrder();
-        List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList);
+        List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList, batchId);
         if(video2List == null){
             logger.warn("writeRcVideoT2(): similarVideoInfoList涓虹┖锛屼笉鑳芥墽琛�, srcVideoId = " + videoFolderInfo.getVideoId());
             return;
@@ -90,9 +96,67 @@
     @Override
     protected void writeRcVideoUser(String batchId, List<String> recVideoIdList) {
         logger.info("姝e湪鍐欏叆涓�娆$敤鎴锋帹鑽愯棰戞暟鎹�, batchId = " + batchId);
+        List<Map<String, Object>> batchUserVideoList = this.videoExecutorService.queryBatchUserVideoList(batchId);
+        if(StringUtils.isEmptyList(batchUserVideoList)){
+            logger.error("鏈壘鍒版壒娆$敤鎴疯棰戣褰曪紝鏃犳硶鏇存柊鐢ㄦ埛鎺ㄨ崘瑙嗛鏁版嵁锛宐atchId = " + batchId);
+            return;
+        }
+        List<Rc_video_t2> video2List = this.videoExecutorService.queryVideoT_2List(batchId);
+        if(StringUtils.isEmptyList(video2List)){
+            logger.warn("浠庤〃'rc_video_t2'鏈姞杞藉埌鎵规瑙嗛缁撴灉璁板綍锛屾棤娉曠户缁啓鍏ョ敤鎴锋帹鑽愯棰�'rc_video_user'");
+            return;
+        }
+
+        // 鍘熷瑙嗛ID涓庣敤鎴峰叧绯�
+        Map<String, Long> srcVideoIdUserCache = new HashMap<>();
+        for(Map<String, Object> m : batchUserVideoList){
+            srcVideoIdUserCache.put(m.get("src_video_id").toString(), Long.parseLong(m.get("user_id").toString()));
+        }
+
+        // 鎺ㄨ崘瑙嗛ID涓庣敤鎴峰叧绯�
+        Map<String, Long> videoIdUserCache = new HashMap<>();
+        Long userId = null;
+        for(Rc_video_t2 e : video2List){
+            userId = srcVideoIdUserCache.get(e.getSrc_video_id());
+            if(userId == null){
+                throw new IllegalArgumentException("鎺ㄨ崘瑙嗛鐨勫師濮嬭棰戞湭鎵惧埌瀵瑰簲鐢ㄦ埛锛宻rc_video = " + e.getSrc_video_id() + ", rec_video = " + e.getSim_video_id());
+            }
+            videoIdUserCache.put(e.getSim_video_id(), userId);
+        }
+
+        SimilarVideoUserOrder similarVideoUserOrder = new SimilarVideoUserOrder(videoIdUserCache, video2List);
+        List<SimilarVideoUser> similarVideoUserList = similarVideoUserOrder.calculateOrder();
+        List<Rc_video_user> videoUserList = this.toRcVideoUserList(similarVideoUserList);
+        if(videoUserList == null){
+            logger.warn("writeRcVideoUser(): similarVideoUserList锛屼笉鑳芥墽琛�, batchId = " + batchId);
+            return;
+        }
+
+        // 鐢ㄦ埛鎺ㄨ崘瑙嗛璁板綍锛岄渶瑕佹洿鏂帮紝濡傛灉涓嶅瓨鍦ㄦ墠鍐欏叆銆�
+        // 娉ㄦ剰锛氳繖閲屾殏鏃朵笉鏇存柊锛屽叏閮ㄥ啓鍏ワ紝杩欐牱鐢ㄦ埛鎺ㄨ崘琛ㄤ腑浼氬瓨鍦ㄨ緝澶氶噸澶嶆暟鎹紝鍚庣画瑕佹坊鍔犲畾鏃朵换鍔℃竻闄ら噸澶嶆暟鎹嵆鍙��
+        this.videoExecutorService.execBatchInsertVideoUser(videoUserList, batchId);
+        logger.debug("鎵规浠诲姟鐘舵�佸凡鏇存柊鎴愬姛! " + batchId);
     }
 
-    private List<Rc_video_t2> toRcVideoT2List(String srcVideoId, List<SimilarVideoInfo> similarVideoInfoList){
+    private List<Rc_video_user> toRcVideoUserList(List<SimilarVideoUser> similarVideoUserList){
+        if(StringUtils.isEmptyList(similarVideoUserList)){
+            return null;
+        }
+        List<Rc_video_user> resultList = new ArrayList<>(similarVideoUserList.size());
+        Rc_video_user rc_video_t2 = null;
+        for(SimilarVideoUser e : similarVideoUserList){
+            rc_video_t2 = new Rc_video_user(NumberGenerator.getLongSequenceNumber());
+            rc_video_t2.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
+            rc_video_t2.setUser_id(e.getUserId());
+            rc_video_t2.setVideo_id(e.getRecommendVideoId());
+            rc_video_t2.setScore(e.getScore());
+            resultList.add(rc_video_t2);
+        }
+        return resultList;
+    }
+
+    private List<Rc_video_t2> toRcVideoT2List(String srcVideoId
+            , List<SimilarVideoInfo> similarVideoInfoList, String batchId){
         if(StringUtils.isEmptyList(similarVideoInfoList)){
             return null;
         }
@@ -103,6 +167,7 @@
             rc_video_t2.setSrc_video_id(srcVideoId);
             rc_video_t2.setSim_video_id(e.getId());
             rc_video_t2.setScore(e.getScore());
+            rc_video_t2.setBatch_id(batchId);
             resultList.add(rc_video_t2);
         }
         return resultList;

--
Gitblit v1.9.1