deploy-jar-template/src/main/resources/application-dev.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_task_status.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/config/VideoSimilarProperties.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/util/CommonUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/util/SFTPUtil.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/resources/sftp.properties | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
deploy-jar-template/src/main/resources/application-dev.yml
@@ -3,7 +3,7 @@ name: train_recommend datasource: # 是否显示dao中打印的SQL语句 show-sql: false show-sql: true username: root # password: Bjjmy_63661766 password: Bjjmy_2020 @@ -99,3 +99,10 @@ # 测试模式,如果是则仅生成固定测试数据,正式部署需要改为 false test-mode: true #业务视频所在数据库信息 business-datasource-url: 124.70.39.177 business-datasource-port: 3306 business-datasource-service: train_test business-datasource-authentication: root business-datasource-certification: Bjjmy_2020 recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_task_status.java
@@ -75,7 +75,7 @@ */ @Override public void setPkValue(Object value) { this.setId((Long) value); this.setId(Long.valueOf(String.valueOf(value))); } public Long getId() { recommend-video/pom.xml
@@ -47,6 +47,18 @@ <scope>provided</scope> </dependency> <!-- ftp工具 --> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.55</version> </dependency> <!--commons-io--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> </dependencies> </project> recommend-video/src/main/java/com/iplatform/recvideo/config/VideoSimilarProperties.java
@@ -13,6 +13,16 @@ private boolean testMode = false; private String businessDatasourceUrl; private int businessDatasourcePort; private String businessDatasourceService; private String businessDatasourceAuthentication; private String businessDatasourceCertification; public boolean isTestMode() { return testMode; } @@ -48,4 +58,43 @@ this.dataFolder = dataFolder; } public String getBusinessDatasourceUrl() { return businessDatasourceUrl; } public void setBusinessDatasourceUrl(String businessDatasourceUrl) { this.businessDatasourceUrl = businessDatasourceUrl; } public int getBusinessDatasourcePort() { return businessDatasourcePort; } public void setBusinessDatasourcePort(int businessDatasourcePort) { this.businessDatasourcePort = businessDatasourcePort; } public String getBusinessDatasourceAuthentication() { return businessDatasourceAuthentication; } public void setBusinessDatasourceAuthentication(String businessDatasourceAuthentication) { this.businessDatasourceAuthentication = businessDatasourceAuthentication; } public String getBusinessDatasourceCertification() { return businessDatasourceCertification; } public void setBusinessDatasourceCertification(String businessDatasourceCertification) { this.businessDatasourceCertification = businessDatasourceCertification; } public String getBusinessDatasourceService() { return businessDatasourceService; } public void setBusinessDatasourceService(String businessDatasourceService) { this.businessDatasourceService = businessDatasourceService; } } recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java
@@ -3,6 +3,7 @@ import com.iplatform.core.BeanContextAware; import com.iplatform.recvideo.VideoLoader; import com.iplatform.recvideo.VideoScheduler; import com.iplatform.recvideo.config.VideoSimilarProperties; import com.iplatform.recvideo.service.VideoLoaderServiceImpl; import com.iplatform.recvideo.support.DefaultVideoLoader; import com.walker.connector.Address; @@ -25,11 +26,13 @@ public VideoLoadScheduler(int id, String name){ super(id, name); VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class); srcAddress = new Address(); srcAddress.setUrl("127.0.0.1"); srcAddress.setPort(3306); srcAddress.setAuthentication("root"); srcAddress.setCertification("123456"); srcAddress.setUrl(videoSimilarProperties.getBusinessDatasourceUrl()); srcAddress.setPort(videoSimilarProperties.getBusinessDatasourcePort()); srcAddress.setService(videoSimilarProperties.getBusinessDatasourceService()); srcAddress.setAuthentication(videoSimilarProperties.getBusinessDatasourceAuthentication()); srcAddress.setCertification(videoSimilarProperties.getBusinessDatasourceCertification()); } @Override recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
@@ -1,17 +1,26 @@ package com.iplatform.recvideo.support; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.CollectionType; import com.iplatform.core.BeanContextAware; import com.iplatform.model.po.Rc_task_status; import com.iplatform.model.po.Rc_video_batch; import com.iplatform.reccommon.TaskStatus; import com.iplatform.reccommon.TaskType; import com.iplatform.recvideo.VideoLoadInfo; import com.iplatform.recvideo.VideoLoader; import com.iplatform.recvideo.config.VideoSimilarProperties; import com.iplatform.recvideo.service.VideoLoaderServiceImpl; import com.iplatform.recvideo.util.SFTPUtil; import com.jcraft.jsch.SftpException; import com.walker.connector.support.DatabaseConnector; import com.walker.infrastructure.utils.CollectionUtils; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; import java.util.ArrayList; import java.util.List; import java.io.File; import java.util.*; public class DefaultVideoLoader extends VideoLoader { @@ -34,18 +43,84 @@ @Override protected long acquireInitBatchId() { return 20210101; return 20210101000000L; } @Override protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) { this.databaseConnector.queryForList("", new Object[]{nextBatchId}); return null; //只管审核通过的,并且创建时间在 批次 与 昨天时间之间 String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)"; List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1}); if (CollectionUtils.isEmpty(srcVideoList)) { return null; } ObjectMapper objectMapper = new ObjectMapper(); CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class); List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size()); VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class); String dataFolder = videoSimilarProperties.getDataFolder(); String pathSeparator = File.separator; for (Map<String, Object> srcVideoMap : srcVideoList) { // videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}] String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath")); String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId")); String batchId = String.valueOf(srcVideoMap.get("batchId")); Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId"))); List<Map> userList = null; try { userList = objectMapper.readValue(videoSrcPath, listType); } catch (JsonProcessingException e) { logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e); return null; } for (int i = 0; i < userList.size(); i++) { Map srcVideo = userList.get(i); VideoLoadInfo videoLoadInfo = new VideoLoadInfo(); String videoPath = String.valueOf(srcVideo.get("path")); videoLoadInfo.setSrcVideoId(srcVideoId); videoLoadInfo.setVideoSrcPath(videoPath); videoLoadInfo.setUserId(userId); videoLoadInfo.setBatchId(batchId); videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf("."))); videoLoadInfoList.add(videoLoadInfo); } } return videoLoadInfoList; } @Override protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception { return 0; Set<String> videoBatchIdSet = new HashSet<>(); SFTPUtil sftp = new SFTPUtil(); sftp.login(); for (VideoLoadInfo videoLoadInfo : readyLoadList) { videoBatchIdSet.add(videoLoadInfo.getBatchId()); String videoSrcPath = videoLoadInfo.getVideoSrcPath(); String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/")); String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1); try { sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath()); } catch (SftpException e) { logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath()); } } sftp.logout(); // 插入记录 rc_task_status Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s)))); for (String videoBatchId : videoBatchIdSet) { Rc_task_status taskStatus = new Rc_task_status(); taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); taskStatus.setName("整理获取短视频任务"); taskStatus.setLast_value(Long.valueOf(videoBatchId)); taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD); taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD); videoLoaderService.save(taskStatus); } return maxVideoBatchId; } @Override @@ -55,7 +130,7 @@ return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus); } private Rc_task_status createNewTaskStatus(long savedBatchId){ private Rc_task_status createNewTaskStatus(long savedBatchId) { Rc_task_status status = new Rc_task_status(); status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); status.setStatus("0"); @@ -66,11 +141,11 @@ return status; } private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){ private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId) { List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size()); Rc_video_batch e = null; String batchId = String.valueOf(savedBatchId); for(VideoLoadInfo v : readyLoadList){ for (VideoLoadInfo v : readyLoadList) { e = new Rc_video_batch(); e.setBatch_id(batchId); e.setSrc_video_id(v.getSrcVideoId()); recommend-video/src/main/java/com/iplatform/recvideo/util/CommonUtil.java
New file @@ -0,0 +1,32 @@ package com.iplatform.recvideo.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.ResourceLoader; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class CommonUtil { private static transient Logger log = LoggerFactory.getLogger(CommonUtil.class); /** * @Author : liu.q [916000612@qq.com] * @少时狂发编程想,无谓赴身IT行. @纵使荣华未可进,我自coding又何妨! * @Time 2020/2/25 3:55 下午 * @Description : 获取资源文件 */ public static Properties getProp(String fileName) { Properties props = new Properties(); ResourceLoader resourceLoader = new DefaultResourceLoader(); try { InputStream in = resourceLoader.getResource(fileName).getInputStream(); props.load(in); } catch (IOException e) { log.error(e.getMessage()); } return props; } } recommend-video/src/main/java/com/iplatform/recvideo/util/SFTPUtil.java
New file @@ -0,0 +1,306 @@ package com.iplatform.recvideo.util; import com.jcraft.jsch.*; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.Properties; import java.util.Vector; /** * @ClassName SFTPUtil * @Author cy * @Date 2022/10/8 * @Description * @Version 1.0 **/ public class SFTPUtil { private transient Logger log = LoggerFactory.getLogger(this.getClass()); private ChannelSftp sftp; private Session session; /** * FTP 登录用户名 */ private String username; /** * FTP 登录密码 */ private String password; /** * 私钥 */ private String privateKey; /** * FTP 服务器地址IP地址 */ private String host; /** * FTP 端口 */ private int port; /** * 构造基于密码认证的sftp对象 * * @param username * @param password * @param host * @param port */ public SFTPUtil(String username, String password, String host, int port) { this.username = username; this.password = password; this.host = host; this.port = port; } /** * 构造基于秘钥认证的sftp对象 * * @param username * @param host * @param port * @param privateKey */ public SFTPUtil(String username, String host, int port, String privateKey) { this.username = username; this.host = host; this.port = port; this.privateKey = privateKey; } public SFTPUtil() { Properties props = CommonUtil.getProp("sftp.properties"); this.username = props.getProperty("sftp.user"); this.password = props.getProperty("sftp.pass"); this.host = props.getProperty("sftp.host"); this.port = Integer.valueOf(props.getProperty("sftp.port")); this.privateKey = props.getProperty("sftp.privateKey"); } /** * 连接sftp服务器 * * @throws Exception */ public void login() { try { JSch jsch = new JSch(); if (privateKey != null) { jsch.addIdentity(privateKey);// 设置私钥 log.info("sftp connect,path of private key file:{}", privateKey); } log.info("sftp connect by host:{} username:{}", host, username); session = jsch.getSession(username, host, port); log.info("Session is build"); if (password != null) { session.setPassword(password); } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); session.connect(); log.info("Session is connected"); Channel channel = session.openChannel("sftp"); channel.connect(); log.info("channel is connected"); sftp = (ChannelSftp) channel; log.info(String.format("sftp server host:[%s] port:[%s] is connect successfull", host, port)); } catch (JSchException e) { log.error("Cannot connect to specified sftp server : {}:{} \n Exception message is: {}", new Object[]{host, port, e.getMessage()}); } } /** * 关闭连接 server */ public void logout() { if (sftp != null) { if (sftp.isConnected()) { sftp.disconnect(); log.info("sftp is closed already"); } } if (session != null) { if (session.isConnected()) { session.disconnect(); log.info("sshSession is closed already"); } } } /** * 将输入流的数据上传到sftp作为文件 * * @param directory 上传到该目录 * @param sftpFileName sftp端文件名 * @param input 输入流 * @throws SftpException * @throws Exception */ public void upload(String directory, String sftpFileName, InputStream input) throws SftpException { try { sftp.cd(directory); } catch (SftpException e) { log.warn("directory is not exist"); sftp.mkdir(directory); sftp.cd(directory); } sftp.put(input, sftpFileName); log.info("file:{} is upload successful", sftpFileName); } /** * 上传单个文件 * * @param directory 上传到sftp目录 * @param uploadFile 要上传的文件,包括路径 * @throws FileNotFoundException * @throws SftpException * @throws Exception */ public void upload(String directory, String uploadFile) throws FileNotFoundException, SftpException { File file = new File(uploadFile); upload(directory, file.getName(), new FileInputStream(file)); } /** * 将byte[]上传到sftp,作为文件。注意:从String生成byte[]是,要指定字符集。 * * @param directory 上传到sftp目录 * @param sftpFileName 文件在sftp端的命名 * @param byteArr 要上传的字节数组 * @throws SftpException * @throws Exception */ public void upload(String directory, String sftpFileName, byte[] byteArr) throws SftpException { upload(directory, sftpFileName, new ByteArrayInputStream(byteArr)); } /** * 将字符串按照指定的字符编码上传到sftp * * @param directory 上传到sftp目录 * @param sftpFileName 文件在sftp端的命名 * @param dataStr 待上传的数据 * @param charsetName sftp上的文件,按该字符编码保存 * @throws UnsupportedEncodingException * @throws SftpException * @throws Exception */ public void upload(String directory, String sftpFileName, String dataStr, String charsetName) throws UnsupportedEncodingException, SftpException { upload(directory, sftpFileName, new ByteArrayInputStream(dataStr.getBytes(charsetName))); } /** * 下载文件 * * @param directory 下载目录 * @param downloadFile 下载的文件 * @param saveFile 存在本地的路径 * @throws SftpException * @throws FileNotFoundException * @throws Exception */ public void download(String directory, String downloadFile, String saveFile) throws SftpException, FileNotFoundException { log.info("下载文件:{}/{}到{}", directory, downloadFile, saveFile); if (directory != null && !"".equals(directory)) { sftp.cd(directory); } File file = new File(saveFile); File parentFile = file.getParentFile(); if (!parentFile.exists()) { parentFile.mkdirs(); } sftp.get(downloadFile, new FileOutputStream(file)); log.info("file:{} is download successful", downloadFile); } /** * 下载文件 * * @param directory 下载目录 * @param downloadFile 下载的文件名 * @return 字节数组 * @throws SftpException * @throws IOException * @throws Exception */ public byte[] download(String directory, String downloadFile) throws SftpException, IOException { if (directory != null && !"".equals(directory)) { sftp.cd(directory); } InputStream is = sftp.get(downloadFile); byte[] fileData = IOUtils.toByteArray(is); log.info("file:{} is download successful", downloadFile); return fileData; } /** * 删除文件 * * @param directory 要删除文件所在目录 * @param deleteFile 要删除的文件 * @throws SftpException * @throws Exception */ public void delete(String directory, String deleteFile) throws SftpException { sftp.cd(directory); sftp.rm(deleteFile); } public void deleteDir(String directory) throws SftpException { sftp.rmdir(directory); } /** * 列出目录下的文件 * * @param directory 要列出的目录 * @return * @throws SftpException */ public Vector<?> listFiles(String directory) throws SftpException { return sftp.ls(directory); } /** * 创建目录,如果存就不会创建 * * @param path 开头必须带/,结尾不能带/,否则后果自负 * @throws SftpException * @throws IOException */ public void mkdir(String path) throws SftpException { String pathArr[] = path.split("/"); String p = ""; for (int i = 1; i < pathArr.length; i++) {//0是空字符串,直接跳过 p += "/" + pathArr[i]; System.out.println(p); try { sftp.cd(p); } catch (Exception e) { sftp.mkdir(p); } } } public static void main(String[] args) throws Exception { SFTPUtil sftp = new SFTPUtil(); sftp.login(); sftp.download("/train/2022/10/8", "041da9fff35f474abefe101e90ab7d5a.docx", "C:\\Users\\cy\\Desktop\\3D相册\\a2.docx"); sftp.logout(); } } recommend-video/src/main/resources/sftp.properties
New file @@ -0,0 +1,5 @@ sftp.host=124.70.39.177 sftp.port=22 sftp.user=mysftp sftp.pass=Bjjmy_2020 sftp.base=/train