package com.iplatform.recvideo.support;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.type.CollectionType;
|
import com.iplatform.core.BeanContextAware;
|
import com.iplatform.model.po.Rc_task_status;
|
import com.iplatform.model.po.Rc_video_batch;
|
import com.iplatform.reccommon.TaskStatus;
|
import com.iplatform.reccommon.TaskType;
|
import com.iplatform.recvideo.VideoLoadInfo;
|
import com.iplatform.recvideo.VideoLoader;
|
import com.iplatform.recvideo.config.VideoSimilarProperties;
|
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
|
import com.iplatform.recvideo.util.SFTPUtil;
|
import com.jcraft.jsch.SftpException;
|
import com.walker.connector.support.DatabaseConnector;
|
import com.walker.infrastructure.utils.CollectionUtils;
|
import com.walker.infrastructure.utils.DateUtils;
|
import com.walker.infrastructure.utils.NumberGenerator;
|
|
import java.io.File;
|
import java.util.*;
|
|
public class DefaultVideoLoader extends VideoLoader {
|
|
private VideoLoaderServiceImpl videoLoaderService;
|
|
private DatabaseConnector databaseConnector = null;
|
|
public void setDatabaseConnector(DatabaseConnector databaseConnector) {
|
this.databaseConnector = databaseConnector;
|
}
|
|
public void setVideoLoaderService(VideoLoaderServiceImpl videoLoaderService) {
|
this.videoLoaderService = videoLoaderService;
|
}
|
|
@Override
|
protected long acquireNextBatchId() {
|
return this.videoLoaderService.queryNextBatchValue();
|
}
|
|
@Override
|
protected long acquireInitBatchId() {
|
return 20210101000000L;
|
}
|
|
@Override
|
protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
|
//只管审核通过的,并且创建时间在 批次 与 昨天时间之间
|
String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)";
|
List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1});
|
if (CollectionUtils.isEmpty(srcVideoList)) {
|
return null;
|
}
|
ObjectMapper objectMapper = new ObjectMapper();
|
CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
|
List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size());
|
|
VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
|
String dataFolder = videoSimilarProperties.getDataFolder();
|
String pathSeparator = File.separator;
|
|
for (Map<String, Object> srcVideoMap : srcVideoList) {
|
// videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}]
|
String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath"));
|
String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId"));
|
String batchId = String.valueOf(srcVideoMap.get("batchId"));
|
Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId")));
|
List<Map> userList = null;
|
try {
|
userList = objectMapper.readValue(videoSrcPath, listType);
|
} catch (JsonProcessingException e) {
|
logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e);
|
return null;
|
}
|
for (int i = 0; i < userList.size(); i++) {
|
Map srcVideo = userList.get(i);
|
VideoLoadInfo videoLoadInfo = new VideoLoadInfo();
|
String videoPath = String.valueOf(srcVideo.get("path"));
|
videoLoadInfo.setSrcVideoId(srcVideoId);
|
videoLoadInfo.setVideoSrcPath(videoPath);
|
videoLoadInfo.setUserId(userId);
|
videoLoadInfo.setBatchId(batchId);
|
videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf(".")));
|
videoLoadInfoList.add(videoLoadInfo);
|
}
|
}
|
|
return videoLoadInfoList;
|
}
|
|
@Override
|
protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
|
Set<String> videoBatchIdSet = new HashSet<>();
|
SFTPUtil sftp = new SFTPUtil();
|
sftp.login();
|
for (VideoLoadInfo videoLoadInfo : readyLoadList) {
|
videoBatchIdSet.add(videoLoadInfo.getBatchId());
|
String videoSrcPath = videoLoadInfo.getVideoSrcPath();
|
String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/"));
|
String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1);
|
try {
|
sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath());
|
} catch (SftpException e) {
|
logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath());
|
}
|
}
|
sftp.logout();
|
// 插入记录 rc_task_status
|
Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s))));
|
for (String videoBatchId : videoBatchIdSet) {
|
Rc_task_status taskStatus = new Rc_task_status();
|
taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
|
taskStatus.setName("整理获取短视频任务");
|
taskStatus.setLast_value(Long.valueOf(videoBatchId));
|
taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD);
|
taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD);
|
videoLoaderService.save(taskStatus);
|
}
|
|
return maxVideoBatchId;
|
}
|
|
@Override
|
protected int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
|
List<Rc_video_batch> videoBatchList = this.toVideoBatchList(readyLoadList, savedBatchId);
|
Rc_task_status taskStatus = this.createNewTaskStatus(savedBatchId);
|
return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus);
|
}
|
|
private Rc_task_status createNewTaskStatus(long savedBatchId) {
|
Rc_task_status status = new Rc_task_status();
|
status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
|
status.setStatus("0");
|
status.setId(NumberGenerator.getLongSequenceNumber());
|
status.setTask_type(TaskType.INDEX_VIDEO_LOAD);
|
status.setLast_value(savedBatchId);
|
status.setName("整理获取短视频任务");
|
return status;
|
}
|
|
private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
|
List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size());
|
Rc_video_batch e = null;
|
String batchId = String.valueOf(savedBatchId);
|
for (VideoLoadInfo v : readyLoadList) {
|
e = new Rc_video_batch();
|
e.setBatch_id(batchId);
|
e.setSrc_video_id(v.getSrcVideoId());
|
e.setUser_id(v.getUserId());
|
e.setSrc_video_path(v.getVideoDestPath());
|
// e.setId(NumberGenerator.getLongSequenceNumber());
|
resultList.add(e);
|
}
|
return resultList;
|
}
|
}
|