shikeying
2022-10-11 71d355f4db31807a13d78e6e257e9d26c4702c23
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
@@ -1,17 +1,27 @@
package com.iplatform.recvideo.support;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.iplatform.core.BeanContextAware;
import com.iplatform.model.po.Rc_task_status;
import com.iplatform.model.po.Rc_video_batch;
import com.iplatform.reccommon.TaskStatus;
import com.iplatform.reccommon.TaskType;
import com.iplatform.recvideo.VideoLoadInfo;
import com.iplatform.recvideo.VideoLoader;
import com.iplatform.recvideo.config.VideoSimilarProperties;
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
import com.iplatform.recvideo.util.SFTPUtil;
import com.jcraft.jsch.SftpException;
import com.walker.connector.support.DatabaseConnector;
import com.walker.infrastructure.utils.CollectionUtils;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
import com.walker.infrastructure.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.util.*;
public class DefaultVideoLoader extends VideoLoader {
@@ -34,18 +44,85 @@
    @Override
    protected long acquireInitBatchId() {
        return 20210101;
        return 20210101000000L;
    }
    @Override
    protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
        this.databaseConnector.queryForList("", new Object[]{nextBatchId});
        return null;
        //只管审核通过的,并且创建时间在 批次 与 昨天时间之间
        String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)";
        List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1});
        if (CollectionUtils.isEmpty(srcVideoList)) {
            return null;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
        List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size());
        VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
        String dataFolder = videoSimilarProperties.getDataFolder();
//        String pathSeparator = File.separator;
        String pathSeparator = StringUtils.FOLDER_SEPARATOR;
        for (Map<String, Object> srcVideoMap : srcVideoList) {
            //  videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}]
            String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath"));
            String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId"));
            String batchId = String.valueOf(srcVideoMap.get("batchId"));
            Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId")));
            List<Map> userList = null;
            try {
                userList = objectMapper.readValue(videoSrcPath, listType);
            } catch (JsonProcessingException e) {
                logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e);
                return null;
            }
            for (int i = 0; i < userList.size(); i++) {
                Map srcVideo = userList.get(i);
                VideoLoadInfo videoLoadInfo = new VideoLoadInfo();
                String videoPath = String.valueOf(srcVideo.get("path"));
                videoLoadInfo.setSrcVideoId(srcVideoId);
                videoLoadInfo.setVideoSrcPath(videoPath);
                videoLoadInfo.setUserId(userId);
                videoLoadInfo.setBatchId(batchId);
                videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf(".")));
                videoLoadInfoList.add(videoLoadInfo);
            }
        }
        return videoLoadInfoList;
    }
    @Override
    protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
        return 0;
        Set<String> videoBatchIdSet = new HashSet<>();
        SFTPUtil sftp = new SFTPUtil();
        sftp.login();
        for (VideoLoadInfo videoLoadInfo : readyLoadList) {
            videoBatchIdSet.add(videoLoadInfo.getBatchId());
            String videoSrcPath = videoLoadInfo.getVideoSrcPath();
            String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/"));
            String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1);
            try {
                sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath());
            } catch (SftpException e) {
                logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath());
            }
        }
        sftp.logout();
        // 插入记录 rc_task_status
        Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s))));
        for (String videoBatchId : videoBatchIdSet) {
            Rc_task_status taskStatus = new Rc_task_status();
            taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
            taskStatus.setName("整理获取短视频任务");
            taskStatus.setLast_value(Long.valueOf(videoBatchId));
            taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD);
            taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD);
            videoLoaderService.save(taskStatus);
        }
        return maxVideoBatchId;
    }
    @Override
@@ -55,7 +132,7 @@
        return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus);
    }
    private Rc_task_status createNewTaskStatus(long savedBatchId){
    private Rc_task_status createNewTaskStatus(long savedBatchId) {
        Rc_task_status status = new Rc_task_status();
        status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
        status.setStatus("0");
@@ -66,17 +143,17 @@
        return status;
    }
    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){
    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
        List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size());
        Rc_video_batch e = null;
        String batchId = String.valueOf(savedBatchId);
        for(VideoLoadInfo v : readyLoadList){
        for (VideoLoadInfo v : readyLoadList) {
            e = new Rc_video_batch();
            e.setBatch_id(batchId);
            e.setSrc_video_id(v.getSrcVideoId());
            e.setUser_id(v.getUserId());
            e.setSrc_video_path(v.getVideoDestPath());
            e.setId(NumberGenerator.getLongSequenceNumber());
//            e.setId(NumberGenerator.getLongSequenceNumber());
            resultList.add(e);
        }
        return resultList;