package com.iplatform.recvideo.support; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.CollectionType; import com.iplatform.core.BeanContextAware; import com.iplatform.model.po.Rc_task_status; import com.iplatform.model.po.Rc_video_batch; import com.iplatform.reccommon.TaskStatus; import com.iplatform.reccommon.TaskType; import com.iplatform.recvideo.VideoLoadInfo; import com.iplatform.recvideo.VideoLoader; import com.iplatform.recvideo.config.VideoSimilarProperties; import com.iplatform.recvideo.service.VideoLoaderServiceImpl; import com.iplatform.recvideo.util.SFTPUtil; import com.jcraft.jsch.SftpException; import com.walker.connector.support.DatabaseConnector; import com.walker.infrastructure.utils.CollectionUtils; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; import java.io.File; import java.util.*; public class DefaultVideoLoader extends VideoLoader { private VideoLoaderServiceImpl videoLoaderService; private DatabaseConnector databaseConnector = null; public void setDatabaseConnector(DatabaseConnector databaseConnector) { this.databaseConnector = databaseConnector; } public void setVideoLoaderService(VideoLoaderServiceImpl videoLoaderService) { this.videoLoaderService = videoLoaderService; } @Override protected long acquireNextBatchId() { return this.videoLoaderService.queryNextBatchValue(); } @Override protected long acquireInitBatchId() { return 20210101000000L; } @Override protected List acquireLoadVideoFromDatabase(long nextBatchId) { //只管审核通过的,并且创建时间在 批次 与 昨天时间之间 String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)"; List> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1}); if (CollectionUtils.isEmpty(srcVideoList)) { return null; } ObjectMapper objectMapper = new ObjectMapper(); CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class); List videoLoadInfoList = new ArrayList<>(srcVideoList.size()); VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class); String dataFolder = videoSimilarProperties.getDataFolder(); String pathSeparator = File.separator; for (Map srcVideoMap : srcVideoList) { // videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}] String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath")); String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId")); String batchId = String.valueOf(srcVideoMap.get("batchId")); Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId"))); List userList = null; try { userList = objectMapper.readValue(videoSrcPath, listType); } catch (JsonProcessingException e) { logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e); return null; } for (int i = 0; i < userList.size(); i++) { Map srcVideo = userList.get(i); VideoLoadInfo videoLoadInfo = new VideoLoadInfo(); String videoPath = String.valueOf(srcVideo.get("path")); videoLoadInfo.setSrcVideoId(srcVideoId); videoLoadInfo.setVideoSrcPath(videoPath); videoLoadInfo.setUserId(userId); videoLoadInfo.setBatchId(batchId); videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf("."))); videoLoadInfoList.add(videoLoadInfo); } } return videoLoadInfoList; } @Override protected long copyRemoteVideoFiles(List readyLoadList, long nextBatchId) throws Exception { Set videoBatchIdSet = new HashSet<>(); SFTPUtil sftp = new SFTPUtil(); sftp.login(); for (VideoLoadInfo videoLoadInfo : readyLoadList) { videoBatchIdSet.add(videoLoadInfo.getBatchId()); String videoSrcPath = videoLoadInfo.getVideoSrcPath(); String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/")); String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1); try { sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath()); } catch (SftpException e) { logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath()); } } sftp.logout(); // 插入记录 rc_task_status Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s)))); for (String videoBatchId : videoBatchIdSet) { Rc_task_status taskStatus = new Rc_task_status(); taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); taskStatus.setName("整理获取短视频任务"); taskStatus.setLast_value(Long.valueOf(videoBatchId)); taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD); taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD); videoLoaderService.save(taskStatus); } return maxVideoBatchId; } @Override protected int saveDataAndStatus(List readyLoadList, long savedBatchId) { List videoBatchList = this.toVideoBatchList(readyLoadList, savedBatchId); Rc_task_status taskStatus = this.createNewTaskStatus(savedBatchId); return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus); } private Rc_task_status createNewTaskStatus(long savedBatchId) { Rc_task_status status = new Rc_task_status(); status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); status.setStatus("0"); status.setId(NumberGenerator.getLongSequenceNumber()); status.setTask_type(TaskType.INDEX_VIDEO_LOAD); status.setLast_value(savedBatchId); status.setName("整理获取短视频任务"); return status; } private List toVideoBatchList(List readyLoadList, long savedBatchId) { List resultList = new ArrayList<>(readyLoadList.size()); Rc_video_batch e = null; String batchId = String.valueOf(savedBatchId); for (VideoLoadInfo v : readyLoadList) { e = new Rc_video_batch(); e.setBatch_id(batchId); e.setSrc_video_id(v.getSrcVideoId()); e.setUser_id(v.getUserId()); e.setSrc_video_path(v.getVideoDestPath()); // e.setId(NumberGenerator.getLongSequenceNumber()); resultList.add(e); } return resultList; } }