From 301017226e61aa39c8b40e780ca244eeddd073d4 Mon Sep 17 00:00:00 2001
From: cy <1664593601@qq.com>
Date: 星期日, 09 十月 2022 14:14:54 +0800
Subject: [PATCH] feat(schedule): 定时任务视频拉取

---
 recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java |   93 ++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 84 insertions(+), 9 deletions(-)

diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
index 7a3cd92..606f9c8 100644
--- a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
+++ b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
@@ -1,17 +1,26 @@
 package com.iplatform.recvideo.support;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.iplatform.core.BeanContextAware;
 import com.iplatform.model.po.Rc_task_status;
 import com.iplatform.model.po.Rc_video_batch;
+import com.iplatform.reccommon.TaskStatus;
 import com.iplatform.reccommon.TaskType;
 import com.iplatform.recvideo.VideoLoadInfo;
 import com.iplatform.recvideo.VideoLoader;
+import com.iplatform.recvideo.config.VideoSimilarProperties;
 import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
+import com.iplatform.recvideo.util.SFTPUtil;
+import com.jcraft.jsch.SftpException;
 import com.walker.connector.support.DatabaseConnector;
+import com.walker.infrastructure.utils.CollectionUtils;
 import com.walker.infrastructure.utils.DateUtils;
 import com.walker.infrastructure.utils.NumberGenerator;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.File;
+import java.util.*;
 
 public class DefaultVideoLoader extends VideoLoader {
 
@@ -34,18 +43,84 @@
 
     @Override
     protected long acquireInitBatchId() {
-        return 20210101;
+        return 20210101000000L;
     }
 
     @Override
     protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
-        this.databaseConnector.queryForList("", new Object[]{nextBatchId});
-        return null;
+        //鍙瀹℃牳閫氳繃鐨勶紝骞朵笖鍒涘缓鏃堕棿鍦� 鎵规 涓� 鏄ㄥぉ鏃堕棿涔嬮棿
+        String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)";
+        List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1});
+        if (CollectionUtils.isEmpty(srcVideoList)) {
+            return null;
+        }
+        ObjectMapper objectMapper = new ObjectMapper();
+        CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
+        List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size());
+
+        VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
+        String dataFolder = videoSimilarProperties.getDataFolder();
+        String pathSeparator = File.separator;
+
+        for (Map<String, Object> srcVideoMap : srcVideoList) {
+            //  videoSrcPath ==銆媅{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}]
+            String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath"));
+            String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId"));
+            String batchId = String.valueOf(srcVideoMap.get("batchId"));
+            Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId")));
+            List<Map> userList = null;
+            try {
+                userList = objectMapper.readValue(videoSrcPath, listType);
+            } catch (JsonProcessingException e) {
+                logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e);
+                return null;
+            }
+            for (int i = 0; i < userList.size(); i++) {
+                Map srcVideo = userList.get(i);
+                VideoLoadInfo videoLoadInfo = new VideoLoadInfo();
+                String videoPath = String.valueOf(srcVideo.get("path"));
+                videoLoadInfo.setSrcVideoId(srcVideoId);
+                videoLoadInfo.setVideoSrcPath(videoPath);
+                videoLoadInfo.setUserId(userId);
+                videoLoadInfo.setBatchId(batchId);
+                videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf(".")));
+                videoLoadInfoList.add(videoLoadInfo);
+            }
+        }
+
+        return videoLoadInfoList;
     }
 
     @Override
     protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
-        return 0;
+        Set<String> videoBatchIdSet = new HashSet<>();
+        SFTPUtil sftp = new SFTPUtil();
+        sftp.login();
+        for (VideoLoadInfo videoLoadInfo : readyLoadList) {
+            videoBatchIdSet.add(videoLoadInfo.getBatchId());
+            String videoSrcPath = videoLoadInfo.getVideoSrcPath();
+            String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/"));
+            String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1);
+            try {
+                sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath());
+            } catch (SftpException e) {
+                logger.error("copyRemoteVideoFiles():sftp涓嬭浇鏂囦欢{}鍒皗}澶辫触", videoSrcPath, videoLoadInfo.getVideoDestPath());
+            }
+        }
+        sftp.logout();
+        // 鎻掑叆璁板綍 rc_task_status
+        Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s))));
+        for (String videoBatchId : videoBatchIdSet) {
+            Rc_task_status taskStatus = new Rc_task_status();
+            taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
+            taskStatus.setName("鏁寸悊鑾峰彇鐭棰戜换鍔�");
+            taskStatus.setLast_value(Long.valueOf(videoBatchId));
+            taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD);
+            taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD);
+            videoLoaderService.save(taskStatus);
+        }
+
+        return maxVideoBatchId;
     }
 
     @Override
@@ -55,7 +130,7 @@
         return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus);
     }
 
-    private Rc_task_status createNewTaskStatus(long savedBatchId){
+    private Rc_task_status createNewTaskStatus(long savedBatchId) {
         Rc_task_status status = new Rc_task_status();
         status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
         status.setStatus("0");
@@ -66,11 +141,11 @@
         return status;
     }
 
-    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){
+    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
         List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size());
         Rc_video_batch e = null;
         String batchId = String.valueOf(savedBatchId);
-        for(VideoLoadInfo v : readyLoadList){
+        for (VideoLoadInfo v : readyLoadList) {
             e = new Rc_video_batch();
             e.setBatch_id(batchId);
             e.setSrc_video_id(v.getSrcVideoId());

--
Gitblit v1.9.1