cy
2022-10-09 301017226e61aa39c8b40e780ca244eeddd073d4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package com.iplatform.recvideo.support;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.iplatform.core.BeanContextAware;
import com.iplatform.model.po.Rc_task_status;
import com.iplatform.model.po.Rc_video_batch;
import com.iplatform.reccommon.TaskStatus;
import com.iplatform.reccommon.TaskType;
import com.iplatform.recvideo.VideoLoadInfo;
import com.iplatform.recvideo.VideoLoader;
import com.iplatform.recvideo.config.VideoSimilarProperties;
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
import com.iplatform.recvideo.util.SFTPUtil;
import com.jcraft.jsch.SftpException;
import com.walker.connector.support.DatabaseConnector;
import com.walker.infrastructure.utils.CollectionUtils;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
 
import java.io.File;
import java.util.*;
 
public class DefaultVideoLoader extends VideoLoader {
 
    private VideoLoaderServiceImpl videoLoaderService;
 
    private DatabaseConnector databaseConnector = null;
 
    public void setDatabaseConnector(DatabaseConnector databaseConnector) {
        this.databaseConnector = databaseConnector;
    }
 
    public void setVideoLoaderService(VideoLoaderServiceImpl videoLoaderService) {
        this.videoLoaderService = videoLoaderService;
    }
 
    @Override
    protected long acquireNextBatchId() {
        return this.videoLoaderService.queryNextBatchValue();
    }
 
    @Override
    protected long acquireInitBatchId() {
        return 20210101000000L;
    }
 
    @Override
    protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
        //只管审核通过的,并且创建时间在 批次 与 昨天时间之间
        String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)";
        List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1});
        if (CollectionUtils.isEmpty(srcVideoList)) {
            return null;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
        List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size());
 
        VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
        String dataFolder = videoSimilarProperties.getDataFolder();
        String pathSeparator = File.separator;
 
        for (Map<String, Object> srcVideoMap : srcVideoList) {
            //  videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}]
            String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath"));
            String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId"));
            String batchId = String.valueOf(srcVideoMap.get("batchId"));
            Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId")));
            List<Map> userList = null;
            try {
                userList = objectMapper.readValue(videoSrcPath, listType);
            } catch (JsonProcessingException e) {
                logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e);
                return null;
            }
            for (int i = 0; i < userList.size(); i++) {
                Map srcVideo = userList.get(i);
                VideoLoadInfo videoLoadInfo = new VideoLoadInfo();
                String videoPath = String.valueOf(srcVideo.get("path"));
                videoLoadInfo.setSrcVideoId(srcVideoId);
                videoLoadInfo.setVideoSrcPath(videoPath);
                videoLoadInfo.setUserId(userId);
                videoLoadInfo.setBatchId(batchId);
                videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf(".")));
                videoLoadInfoList.add(videoLoadInfo);
            }
        }
 
        return videoLoadInfoList;
    }
 
    @Override
    protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
        Set<String> videoBatchIdSet = new HashSet<>();
        SFTPUtil sftp = new SFTPUtil();
        sftp.login();
        for (VideoLoadInfo videoLoadInfo : readyLoadList) {
            videoBatchIdSet.add(videoLoadInfo.getBatchId());
            String videoSrcPath = videoLoadInfo.getVideoSrcPath();
            String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/"));
            String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1);
            try {
                sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath());
            } catch (SftpException e) {
                logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath());
            }
        }
        sftp.logout();
        // 插入记录 rc_task_status
        Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s))));
        for (String videoBatchId : videoBatchIdSet) {
            Rc_task_status taskStatus = new Rc_task_status();
            taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
            taskStatus.setName("整理获取短视频任务");
            taskStatus.setLast_value(Long.valueOf(videoBatchId));
            taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD);
            taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD);
            videoLoaderService.save(taskStatus);
        }
 
        return maxVideoBatchId;
    }
 
    @Override
    protected int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
        List<Rc_video_batch> videoBatchList = this.toVideoBatchList(readyLoadList, savedBatchId);
        Rc_task_status taskStatus = this.createNewTaskStatus(savedBatchId);
        return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus);
    }
 
    private Rc_task_status createNewTaskStatus(long savedBatchId) {
        Rc_task_status status = new Rc_task_status();
        status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
        status.setStatus("0");
        status.setId(NumberGenerator.getLongSequenceNumber());
        status.setTask_type(TaskType.INDEX_VIDEO_LOAD);
        status.setLast_value(savedBatchId);
        status.setName("整理获取短视频任务");
        return status;
    }
 
    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
        List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size());
        Rc_video_batch e = null;
        String batchId = String.valueOf(savedBatchId);
        for (VideoLoadInfo v : readyLoadList) {
            e = new Rc_video_batch();
            e.setBatch_id(batchId);
            e.setSrc_video_id(v.getSrcVideoId());
            e.setUser_id(v.getUserId());
            e.setSrc_video_path(v.getVideoDestPath());
            e.setId(NumberGenerator.getLongSequenceNumber());
            resultList.add(e);
        }
        return resultList;
    }
}