cy
2022-10-09 301017226e61aa39c8b40e780ca244eeddd073d4
feat(schedule): 定时任务视频拉取

查询 最大批次时间之后--24小时前之前 审核通过的视频并通过ftp下载到本地
3个文件已添加
6个文件已修改
511 ■■■■■ 已修改文件
deploy-jar-template/src/main/resources/application-dev.yml 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_task_status.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/pom.xml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/config/VideoSimilarProperties.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/util/CommonUtil.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/util/SFTPUtil.java 306 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/resources/sftp.properties 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deploy-jar-template/src/main/resources/application-dev.yml
@@ -3,7 +3,7 @@
    name: train_recommend
  datasource:
    # 是否显示dao中打印的SQL语句
    show-sql: false
    show-sql: true
    username: root
#    password: Bjjmy_63661766
    password: Bjjmy_2020
@@ -99,3 +99,10 @@
    # 测试模式,如果是则仅生成固定测试数据,正式部署需要改为 false
    test-mode: true
    #业务视频所在数据库信息
    business-datasource-url: 124.70.39.177
    business-datasource-port: 3306
    business-datasource-service: train_test
    business-datasource-authentication: root
    business-datasource-certification: Bjjmy_2020
recommend-model-pojo/src/main/java/com/iplatform/model/po/Rc_task_status.java
@@ -75,7 +75,7 @@
     */
    @Override
    public void setPkValue(Object value) {
        this.setId((Long) value);
        this.setId(Long.valueOf(String.valueOf(value)));
    }
    public Long getId() {
recommend-video/pom.xml
@@ -47,6 +47,18 @@
            <scope>provided</scope>
        </dependency>
        <!-- ftp工具 -->
        <dependency>
            <groupId>com.jcraft</groupId>
            <artifactId>jsch</artifactId>
            <version>0.1.55</version>
        </dependency>
        <!--commons-io-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>
    </dependencies>
</project>
recommend-video/src/main/java/com/iplatform/recvideo/config/VideoSimilarProperties.java
@@ -13,6 +13,16 @@
    private boolean testMode = false;
    private String businessDatasourceUrl;
    private int businessDatasourcePort;
    private String businessDatasourceService;
    private String businessDatasourceAuthentication;
    private String businessDatasourceCertification;
    public boolean isTestMode() {
        return testMode;
    }
@@ -48,4 +58,43 @@
        this.dataFolder = dataFolder;
    }
    public String getBusinessDatasourceUrl() {
        return businessDatasourceUrl;
    }
    public void setBusinessDatasourceUrl(String businessDatasourceUrl) {
        this.businessDatasourceUrl = businessDatasourceUrl;
    }
    public int getBusinessDatasourcePort() {
        return businessDatasourcePort;
    }
    public void setBusinessDatasourcePort(int businessDatasourcePort) {
        this.businessDatasourcePort = businessDatasourcePort;
    }
    public String getBusinessDatasourceAuthentication() {
        return businessDatasourceAuthentication;
    }
    public void setBusinessDatasourceAuthentication(String businessDatasourceAuthentication) {
        this.businessDatasourceAuthentication = businessDatasourceAuthentication;
    }
    public String getBusinessDatasourceCertification() {
        return businessDatasourceCertification;
    }
    public void setBusinessDatasourceCertification(String businessDatasourceCertification) {
        this.businessDatasourceCertification = businessDatasourceCertification;
    }
    public String getBusinessDatasourceService() {
        return businessDatasourceService;
    }
    public void setBusinessDatasourceService(String businessDatasourceService) {
        this.businessDatasourceService = businessDatasourceService;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java
@@ -3,6 +3,7 @@
import com.iplatform.core.BeanContextAware;
import com.iplatform.recvideo.VideoLoader;
import com.iplatform.recvideo.VideoScheduler;
import com.iplatform.recvideo.config.VideoSimilarProperties;
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
import com.iplatform.recvideo.support.DefaultVideoLoader;
import com.walker.connector.Address;
@@ -25,11 +26,13 @@
    public VideoLoadScheduler(int id, String name){
        super(id, name);
        VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
        srcAddress = new Address();
        srcAddress.setUrl("127.0.0.1");
        srcAddress.setPort(3306);
        srcAddress.setAuthentication("root");
        srcAddress.setCertification("123456");
        srcAddress.setUrl(videoSimilarProperties.getBusinessDatasourceUrl());
        srcAddress.setPort(videoSimilarProperties.getBusinessDatasourcePort());
        srcAddress.setService(videoSimilarProperties.getBusinessDatasourceService());
        srcAddress.setAuthentication(videoSimilarProperties.getBusinessDatasourceAuthentication());
        srcAddress.setCertification(videoSimilarProperties.getBusinessDatasourceCertification());
    }
    @Override
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
@@ -1,17 +1,26 @@
package com.iplatform.recvideo.support;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.iplatform.core.BeanContextAware;
import com.iplatform.model.po.Rc_task_status;
import com.iplatform.model.po.Rc_video_batch;
import com.iplatform.reccommon.TaskStatus;
import com.iplatform.reccommon.TaskType;
import com.iplatform.recvideo.VideoLoadInfo;
import com.iplatform.recvideo.VideoLoader;
import com.iplatform.recvideo.config.VideoSimilarProperties;
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
import com.iplatform.recvideo.util.SFTPUtil;
import com.jcraft.jsch.SftpException;
import com.walker.connector.support.DatabaseConnector;
import com.walker.infrastructure.utils.CollectionUtils;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.util.*;
public class DefaultVideoLoader extends VideoLoader {
@@ -34,18 +43,84 @@
    @Override
    protected long acquireInitBatchId() {
        return 20210101;
        return 20210101000000L;
    }
    @Override
    protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
        this.databaseConnector.queryForList("", new Object[]{nextBatchId});
        //只管审核通过的,并且创建时间在 批次 与 昨天时间之间
        String sql = "SELECT id AS srcVideoId,video AS videoSrcPath,date_format(create_time,'%Y%m%d%H%i%S') AS batchId,create_user_id AS userId FROM train_video_online WHERE type=1 AND state=1 AND create_time BETWEEN STR_TO_DATE(?,'%Y%m%d%H%i%S') AND date_sub(NOW(),INTERVAL 1 DAY)";
        List<Map<String, Object>> srcVideoList = this.databaseConnector.queryForList(sql, new Object[]{nextBatchId + 1});
        if (CollectionUtils.isEmpty(srcVideoList)) {
        return null;
        }
        ObjectMapper objectMapper = new ObjectMapper();
        CollectionType listType = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, Map.class);
        List<VideoLoadInfo> videoLoadInfoList = new ArrayList<>(srcVideoList.size());
        VideoSimilarProperties videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class);
        String dataFolder = videoSimilarProperties.getDataFolder();
        String pathSeparator = File.separator;
        for (Map<String, Object> srcVideoMap : srcVideoList) {
            //  videoSrcPath ==》[{"name": "/opt/home/train/tmp/2020/10/28/video/489c9fd871a04537bd20ebfc47457368.mp4", "path": "/train/2020/10/28/100cecfe4d4b4e70a2eb2733d78ec12e.mp4"}]
            String videoSrcPath = String.valueOf(srcVideoMap.get("videoSrcPath"));
            String srcVideoId = String.valueOf(srcVideoMap.get("srcVideoId"));
            String batchId = String.valueOf(srcVideoMap.get("batchId"));
            Long userId = Long.valueOf(String.valueOf(srcVideoMap.get("userId")));
            List<Map> userList = null;
            try {
                userList = objectMapper.readValue(videoSrcPath, listType);
            } catch (JsonProcessingException e) {
                logger.error("acquireLoadVideoFromDatabase(): " + e.getMessage(), e);
                return null;
            }
            for (int i = 0; i < userList.size(); i++) {
                Map srcVideo = userList.get(i);
                VideoLoadInfo videoLoadInfo = new VideoLoadInfo();
                String videoPath = String.valueOf(srcVideo.get("path"));
                videoLoadInfo.setSrcVideoId(srcVideoId);
                videoLoadInfo.setVideoSrcPath(videoPath);
                videoLoadInfo.setUserId(userId);
                videoLoadInfo.setBatchId(batchId);
                videoLoadInfo.setVideoDestPath(dataFolder + batchId + pathSeparator + srcVideoId + videoPath.substring(videoPath.lastIndexOf(".")));
                videoLoadInfoList.add(videoLoadInfo);
            }
        }
        return videoLoadInfoList;
    }
    @Override
    protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
        return 0;
        Set<String> videoBatchIdSet = new HashSet<>();
        SFTPUtil sftp = new SFTPUtil();
        sftp.login();
        for (VideoLoadInfo videoLoadInfo : readyLoadList) {
            videoBatchIdSet.add(videoLoadInfo.getBatchId());
            String videoSrcPath = videoLoadInfo.getVideoSrcPath();
            String directory = videoSrcPath.substring(0, videoSrcPath.lastIndexOf("/"));
            String downloadFile = videoSrcPath.substring(videoSrcPath.lastIndexOf("/") + 1);
            try {
                sftp.download(directory, downloadFile, videoLoadInfo.getVideoDestPath());
            } catch (SftpException e) {
                logger.error("copyRemoteVideoFiles():sftp下载文件{}到{}失败", videoSrcPath, videoLoadInfo.getVideoDestPath());
            }
        }
        sftp.logout();
        // 插入记录 rc_task_status
        Long maxVideoBatchId = Long.valueOf(Collections.max(videoBatchIdSet, Comparator.comparing(s -> Long.valueOf(s))));
        for (String videoBatchId : videoBatchIdSet) {
            Rc_task_status taskStatus = new Rc_task_status();
            taskStatus.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
            taskStatus.setName("整理获取短视频任务");
            taskStatus.setLast_value(Long.valueOf(videoBatchId));
            taskStatus.setStatus(TaskStatus.INDEX_VIDEO_LOAD);
            taskStatus.setTask_type(TaskType.INDEX_VIDEO_LOAD);
            videoLoaderService.save(taskStatus);
        }
        return maxVideoBatchId;
    }
    @Override
recommend-video/src/main/java/com/iplatform/recvideo/util/CommonUtil.java
New file
@@ -0,0 +1,32 @@
package com.iplatform.recvideo.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.ResourceLoader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class CommonUtil {
    private static transient Logger log = LoggerFactory.getLogger(CommonUtil.class);
    /**
     * @Author : liu.q [916000612@qq.com]
     * @少时狂发编程想,无谓赴身IT行. @纵使荣华未可进,我自coding又何妨!
     * @Time 2020/2/25 3:55 下午
     * @Description : 获取资源文件
     */
    public static Properties getProp(String fileName) {
        Properties props = new Properties();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        try {
            InputStream in = resourceLoader.getResource(fileName).getInputStream();
            props.load(in);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return props;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/util/SFTPUtil.java
New file
@@ -0,0 +1,306 @@
package com.iplatform.recvideo.util;
import com.jcraft.jsch.*;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Properties;
import java.util.Vector;
/**
 * @ClassName SFTPUtil
 * @Author cy
 * @Date 2022/10/8
 * @Description
 * @Version 1.0
 **/
public class SFTPUtil {
    private transient Logger log = LoggerFactory.getLogger(this.getClass());
    private ChannelSftp sftp;
    private Session session;
    /**
     * FTP 登录用户名
     */
    private String username;
    /**
     * FTP 登录密码
     */
    private String password;
    /**
     * 私钥
     */
    private String privateKey;
    /**
     * FTP 服务器地址IP地址
     */
    private String host;
    /**
     * FTP 端口
     */
    private int port;
    /**
     * 构造基于密码认证的sftp对象
     *
     * @param username
     * @param password
     * @param host
     * @param port
     */
    public SFTPUtil(String username, String password, String host, int port) {
        this.username = username;
        this.password = password;
        this.host = host;
        this.port = port;
    }
    /**
     * 构造基于秘钥认证的sftp对象
     *
     * @param username
     * @param host
     * @param port
     * @param privateKey
     */
    public SFTPUtil(String username, String host, int port, String privateKey) {
        this.username = username;
        this.host = host;
        this.port = port;
        this.privateKey = privateKey;
    }
    public SFTPUtil() {
        Properties props = CommonUtil.getProp("sftp.properties");
        this.username = props.getProperty("sftp.user");
        this.password = props.getProperty("sftp.pass");
        this.host = props.getProperty("sftp.host");
        this.port = Integer.valueOf(props.getProperty("sftp.port"));
        this.privateKey = props.getProperty("sftp.privateKey");
    }
    /**
     * 连接sftp服务器
     *
     * @throws Exception
     */
    public void login() {
        try {
            JSch jsch = new JSch();
            if (privateKey != null) {
                jsch.addIdentity(privateKey);// 设置私钥
                log.info("sftp connect,path of private key file:{}", privateKey);
            }
            log.info("sftp connect by host:{} username:{}", host, username);
            session = jsch.getSession(username, host, port);
            log.info("Session is build");
            if (password != null) {
                session.setPassword(password);
            }
            Properties config = new Properties();
            config.put("StrictHostKeyChecking", "no");
            session.setConfig(config);
            session.connect();
            log.info("Session is connected");
            Channel channel = session.openChannel("sftp");
            channel.connect();
            log.info("channel is connected");
            sftp = (ChannelSftp) channel;
            log.info(String.format("sftp server host:[%s] port:[%s] is connect successfull", host, port));
        } catch (JSchException e) {
            log.error("Cannot connect to specified sftp server : {}:{} \n Exception message is: {}", new Object[]{host, port, e.getMessage()});
        }
    }
    /**
     * 关闭连接 server
     */
    public void logout() {
        if (sftp != null) {
            if (sftp.isConnected()) {
                sftp.disconnect();
                log.info("sftp is closed already");
            }
        }
        if (session != null) {
            if (session.isConnected()) {
                session.disconnect();
                log.info("sshSession is closed already");
            }
        }
    }
    /**
     * 将输入流的数据上传到sftp作为文件
     *
     * @param directory    上传到该目录
     * @param sftpFileName sftp端文件名
     * @param input        输入流
     * @throws SftpException
     * @throws Exception
     */
    public void upload(String directory, String sftpFileName, InputStream input) throws SftpException {
        try {
            sftp.cd(directory);
        } catch (SftpException e) {
            log.warn("directory is not exist");
            sftp.mkdir(directory);
            sftp.cd(directory);
        }
        sftp.put(input, sftpFileName);
        log.info("file:{} is upload successful", sftpFileName);
    }
    /**
     * 上传单个文件
     *
     * @param directory  上传到sftp目录
     * @param uploadFile 要上传的文件,包括路径
     * @throws FileNotFoundException
     * @throws SftpException
     * @throws Exception
     */
    public void upload(String directory, String uploadFile) throws FileNotFoundException, SftpException {
        File file = new File(uploadFile);
        upload(directory, file.getName(), new FileInputStream(file));
    }
    /**
     * 将byte[]上传到sftp,作为文件。注意:从String生成byte[]是,要指定字符集。
     *
     * @param directory    上传到sftp目录
     * @param sftpFileName 文件在sftp端的命名
     * @param byteArr      要上传的字节数组
     * @throws SftpException
     * @throws Exception
     */
    public void upload(String directory, String sftpFileName, byte[] byteArr) throws SftpException {
        upload(directory, sftpFileName, new ByteArrayInputStream(byteArr));
    }
    /**
     * 将字符串按照指定的字符编码上传到sftp
     *
     * @param directory    上传到sftp目录
     * @param sftpFileName 文件在sftp端的命名
     * @param dataStr      待上传的数据
     * @param charsetName  sftp上的文件,按该字符编码保存
     * @throws UnsupportedEncodingException
     * @throws SftpException
     * @throws Exception
     */
    public void upload(String directory, String sftpFileName, String dataStr, String charsetName) throws UnsupportedEncodingException, SftpException {
        upload(directory, sftpFileName, new ByteArrayInputStream(dataStr.getBytes(charsetName)));
    }
    /**
     * 下载文件
     *
     * @param directory    下载目录
     * @param downloadFile 下载的文件
     * @param saveFile     存在本地的路径
     * @throws SftpException
     * @throws FileNotFoundException
     * @throws Exception
     */
    public void download(String directory, String downloadFile, String saveFile) throws SftpException, FileNotFoundException {
        log.info("下载文件:{}/{}到{}", directory, downloadFile, saveFile);
        if (directory != null && !"".equals(directory)) {
            sftp.cd(directory);
        }
        File file = new File(saveFile);
        File parentFile = file.getParentFile();
        if (!parentFile.exists()) {
            parentFile.mkdirs();
        }
        sftp.get(downloadFile, new FileOutputStream(file));
        log.info("file:{} is download successful", downloadFile);
    }
    /**
     * 下载文件
     *
     * @param directory    下载目录
     * @param downloadFile 下载的文件名
     * @return 字节数组
     * @throws SftpException
     * @throws IOException
     * @throws Exception
     */
    public byte[] download(String directory, String downloadFile) throws SftpException, IOException {
        if (directory != null && !"".equals(directory)) {
            sftp.cd(directory);
        }
        InputStream is = sftp.get(downloadFile);
        byte[] fileData = IOUtils.toByteArray(is);
        log.info("file:{} is download successful", downloadFile);
        return fileData;
    }
    /**
     * 删除文件
     *
     * @param directory  要删除文件所在目录
     * @param deleteFile 要删除的文件
     * @throws SftpException
     * @throws Exception
     */
    public void delete(String directory, String deleteFile) throws SftpException {
        sftp.cd(directory);
        sftp.rm(deleteFile);
    }
    public void deleteDir(String directory) throws SftpException {
        sftp.rmdir(directory);
    }
    /**
     * 列出目录下的文件
     *
     * @param directory 要列出的目录
     * @return
     * @throws SftpException
     */
    public Vector<?> listFiles(String directory) throws SftpException {
        return sftp.ls(directory);
    }
    /**
     * 创建目录,如果存就不会创建
     *
     * @param path 开头必须带/,结尾不能带/,否则后果自负
     * @throws SftpException
     * @throws IOException
     */
    public void mkdir(String path) throws SftpException {
        String pathArr[] = path.split("/");
        String p = "";
        for (int i = 1; i < pathArr.length; i++) {//0是空字符串,直接跳过
            p += "/" + pathArr[i];
            System.out.println(p);
            try {
                sftp.cd(p);
            } catch (Exception e) {
                sftp.mkdir(p);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        SFTPUtil sftp = new SFTPUtil();
        sftp.login();
        sftp.download("/train/2022/10/8", "041da9fff35f474abefe101e90ab7d5a.docx", "C:\\Users\\cy\\Desktop\\3D相册\\a2.docx");
        sftp.logout();
    }
}
recommend-video/src/main/resources/sftp.properties
New file
@@ -0,0 +1,5 @@
sftp.host=124.70.39.177
sftp.port=22
sftp.user=mysftp
sftp.pass=Bjjmy_2020
sftp.base=/train