From 301017226e61aa39c8b40e780ca244eeddd073d4 Mon Sep 17 00:00:00 2001 From: cy <1664593601@qq.com> Date: 星期日, 09 十月 2022 14:14:54 +0800 Subject: [PATCH] feat(schedule): 定时任务视频拉取 --- recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java | 93 ++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 84 insertions(+), 9 deletions(-) diff --git a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java index 7a3cd92..606f9c8 100644 --- a/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java +++ b/recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java @@ -1,17 +1,26 @@ package com.iplatform.recvideo.support; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.iplatform.core.BeanContextAware; import com.iplatform.model.po.Rc_task_status; import com.iplatform.model.po.Rc_video_batch; +import com.iplatform.reccommon.TaskStatus; import com.iplatform.reccommon.TaskType; import com.iplatform.recvideo.VideoLoadInfo; import com.iplatform.recvideo.VideoLoader; +import com.iplatform.recvideo.config.VideoSimilarProperties; import com.iplatform.recvideo.service.VideoLoaderServiceImpl; +import com.iplatform.recvideo.util.SFTPUtil; +import com.jcraft.jsch.SftpException; import com.walker.connector.support.DatabaseConnector; +import com.walker.infrastructure.utils.CollectionUtils; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; -import java.util.ArrayList; -import java.util.List; +import java.io.File; +import java.util.*; public class DefaultVideoLoader extends VideoLoader { @@ -34,18 +43,84 @@ @Override protected long acquireInitBatchId() { - return 20210101; + return 20210101000000L; } @Override protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) { - this.databaseConnector.queryForList("", new Object[]{nextBatchId}); - return null; + //鍙瀹℃牳閫氳繃鐨勶紝骞朵笖鍒涘缓鏃堕棿鍦� 鎵规 涓� 鏄ㄥぉ鏃堕棿涔嬮棿 + String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)"; + List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1}); + if (CollectionUtils.isEmpty(srcVideoList)) { + return null; + } + ObjectMapper objectMapper = new ObjectMapper(); + CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class); + List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size()); + + VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class); + String dataFolder = videoSimilarProperties.getDataFolder(); + String pathSeparator = File.separator; + + for (Map<String, Object> srcVideoMap : srcVideoList) { + // videoSrcPath ==銆媅{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}] + String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath")); + String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId")); + String batchId = String.valueOf(srcVideoMap.get("batchId")); + Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId"))); + List<Map> userList = null; + try { + userList = objectMapper.readValue(videoSrcPath, listType); + } catch (JsonProcessingException e) { + logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e); + return null; + } + for (int i = 0; i < userList.size(); i++) { + Map srcVideo = userList.get(i); + VideoLoadInfo videoLoadInfo = new VideoLoadInfo(); + String videoPath = String.valueOf(srcVideo.get("path")); + videoLoadInfo.setSrcVideoId(srcVideoId); + videoLoadInfo.setVideoSrcPath(videoPath); + videoLoadInfo.setUserId(userId); + videoLoadInfo.setBatchId(batchId); + videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf("."))); + videoLoadInfoList.add(videoLoadInfo); + } + } + + return videoLoadInfoList; } @Override protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception { - return 0; + Set<String> videoBatchIdSet = new HashSet<>(); + SFTPUtil sftp = new SFTPUtil(); + sftp.login(); + for (VideoLoadInfo videoLoadInfo : readyLoadList) { + videoBatchIdSet.add(videoLoadInfo.getBatchId()); + String videoSrcPath = videoLoadInfo.getVideoSrcPath(); + String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/")); + String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1); + try { + sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath()); + } catch (SftpException e) { + logger.error("copyRemoteVideoFiles():sftp涓嬭浇鏂囦欢{}鍒皗}澶辫触", videoSrcPath, videoLoadInfo.getVideoDestPath()); + } + } + sftp.logout(); + // 鎻掑叆璁板綍 rc_task_status + Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s)))); + for (String videoBatchId : videoBatchIdSet) { + Rc_task_status taskStatus = new Rc_task_status(); + taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); + taskStatus.setName("鏁寸悊鑾峰彇鐭棰戜换鍔�"); + taskStatus.setLast_value(Long.valueOf(videoBatchId)); + taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD); + taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD); + videoLoaderService.save(taskStatus); + } + + return maxVideoBatchId; } @Override @@ -55,7 +130,7 @@ return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus); } - private Rc_task_status createNewTaskStatus(long savedBatchId){ + private Rc_task_status createNewTaskStatus(long savedBatchId) { Rc_task_status status = new Rc_task_status(); status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); status.setStatus("0"); @@ -66,11 +141,11 @@ return status; } - private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){ + private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) { List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size()); Rc_video_batch e = null; String batchId = String.valueOf(savedBatchId); - for(VideoLoadInfo v : readyLoadList){ + for (VideoLoadInfo v : readyLoadList) { e = new Rc_video_batch(); e.setBatch_id(batchId); e.setSrc_video_id(v.getSrcVideoId()); -- Gitblit v1.9.1