shikeying
2022-09-27 fb66d5ed24e716e536543364f746a9db5aeb20a9
视频相似度分析4
5个文件已添加
1个文件已修改
487 ■■■■■ 已修改文件
recommend-video/doc/table.SQL 189 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoadInfo.java 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoader.java 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/service/VideoLoaderServiceImpl.java 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
recommend-video/doc/table.SQL
@@ -1,3 +1,6 @@
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- 以下为索引设置
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ALTER TABLE rc_video_t1
    ADD INDEX inx_src_img (src_img) USING BTREE ;
@@ -18,3 +21,189 @@
ALTER TABLE rc_video_user
    ADD INDEX inx_vu_uid (user_id) USING BTREE ;
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-- 以下为表结构
-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
create table milvus_video_batch (
                                    batch_id bigint(20) not null comment '批次id,即:日期时间',
                                    video_id varchar(255) not null,
                                    video_path varchar(255) default null,
                                    vector_size int(11) not null default '0',
                                    create_time bigint(20) not null,
                                    status int(11) not null default '1' comment '状态:0 未完成,1完成'
) engine=innodb default charset=utf8;
create table milvus_video_search (
                                     milvus_id text,
                                     image_path text,
                                     video_id text
) engine=innodb default charset=utf8;
create table milvus_video_status (
                                     id varchar(255) not null comment '视频路径',
                                     create_time bigint(20) not null,
                                     status int(11) not null default '0' comment '完成状态: 0 未完成, 1 完成',
                                     records bigint(20) not null default '0',
                                     primary key (id)
) engine=innodb default charset=utf8;
create table rc_task_status (
                                id bigint(20) not null comment '主键',
                                create_time bigint(20) not null comment '创建时间,如: 20220922180501',
                                name varchar(180) not null comment '任务名称',
                                last_value bigint(20) not null comment '获取数据的最大值',
                                status varchar(32) not null comment '当前状态,见枚举: taskstatus',
                                start_time bigint(20) default null,
                                end_time bigint(20) default null,
                                msg varchar(180) default null comment '执行描述',
                                task_type varchar(32) not null comment '任务类型,见枚举: tasktype',
                                primary key (id)
) engine=innodb default charset=utf8;
create table rc_video_batch (
                                id bigint(20) not null,
                                batch_id varchar(32) not null comment '批次id',
                                user_id bigint(20) not null,
                                src_video_id varchar(32) not null,
                                src_video_path varchar(255) default null,
                                primary key (id),
                                key inx_vb_bid (batch_id) using btree
) engine=innodb default charset=utf8;
create table rc_video_t1 (
                             src_img varchar(36) not null comment '图像id',
                             src_video_id varchar(32) not null,
                             sim_video_id varchar(32) not null,
                             distance double not null default '0',
                             id varchar(32) not null,
                             primary key (id),
                             key inx_src_img (src_img) using btree,
                             key inx_src_vid (src_video_id) using btree
) engine=innodb default charset=utf8;
create table rc_video_t2 (
                             src_video_id varchar(32) not null,
                             sim_video_id varchar(32) not null,
                             score double not null default '0' comment '相似视频综合得分',
                             id varchar(32) not null,
                             batch_id varchar(32) not null default '0',
                             primary key (id),
                             key inx_t2_src_vid (src_video_id) using btree,
                             key inx_t2_src_bid (batch_id) using btree
) engine=innodb default charset=utf8;
create table rc_video_user (
                               user_id bigint(20) not null,
                               video_id varchar(32) not null,
                               score double not null default '0',
                               create_time bigint(20) not null default '0',
                               id bigint(20) not null,
                               primary key (id),
                               key inx_vu_uid (user_id) using btree
) engine=innodb default charset=utf8;
create table s_host (
                        id bigint(20) not null auto_increment,
                        create_time bigint(20) not null,
                        create_user varchar(36) not null,
                        url varchar(200) not null comment '库地址',
                        port int(11) not null default '1306' comment '端口',
                        service_name varchar(100) default null comment '服务名称',
                        authentication varchar(30) default null comment '认证用户',
                        certification varchar(30) default null comment '授权密码',
                        max_active int(11) not null default '5' comment '最大连接数',
                        primary key (id)
) engine=innodb auto_increment=105 default charset=utf8 comment='存储资源可用主机';
create table s_scheduler (
                             id int(11) not null comment '调度器id',
                             create_time bigint(20) not null,
                             name varchar(180) not null,
                             status int(11) not null default '0' comment '状态:0_初始化,1_运行,2_暂停,9_结束,-1_人工终止',
                             start_time bigint(20) not null default '0' comment '调度开始运行时间',
                             end_time bigint(20) not null default '0' comment '调度结束运行时间',
                             interval_time int(11) not null default '5000' comment '内部线程执行间隔时间,默认:5秒',
                             sleep_time int(11) not null default '600000' comment '线程执行中,睡眠时间,默认:10分钟',
                             sleep_option int(11) not null default '1' comment '采集线程在没有获取数据时,是否进入休眠:0_否,1_是',
                             period_type varchar(36) not null comment '周期类型:none,day,week,month,year',
                             time_type varchar(36) not null comment '定时类型:精确时间_exactly,时间段_range',
                             year int(11) not null default '0',
                             month int(11) not null default '0',
                             day int(11) not null default '0',
                             hour int(11) not null default '0',
                             ranges varchar(50) default null comment '时间范围,多个如:5,6;12,15;...',
                             dept int(11) not null default '0',
                             class_name varchar(200) not null,
                             pause_time bigint(20) not null default '0',
                             summary varchar(255) default null,
                             primary key (id)
) engine=innodb default charset=utf8 comment='平台调度器记录表';
create table sdc_gather (
                            id bigint(20) not null,
                            create_time bigint(20) not null,
                            create_user varchar(36) not null,
                            name varchar(120) not null comment '采集名称',
                            description varchar(255) default null,
                            src_type tinyint(4) not null default '0' comment '采集源:0_http,1_db',
                            src_url varchar(120) not null,
                            src_port int(11) default '80',
                            src_service varchar(120) default null,
                            src_user varchar(120) default null,
                            src_pass varchar(120) default null,
                            store_id varchar(36) not null comment '外键:存储id',
                            status tinyint(4) not null default '0' comment '状态:0_初始化,1_运行,2_暂停,9_结束',
                            schedule_id int(11) not null comment '调度器id',
                            primary key (id)
) engine=innodb default charset=utf8 comment='采集任务记录';
create table sdc_meta_db (
                             id bigint(20) not null,
                             create_time bigint(20) not null,
                             store_id varchar(36) not null comment '外键:存储id',
                             database_name varchar(60) default null,
                             used tinyint(4) not null default '0' comment '是否被使用过:0_否,1_是',
                             is_using tinyint(4) not null default '0' comment '正在被使用:0_否,1_是',
                             host_info varchar(100) not null comment '存储使用库的信息:ip:port',
                             table_count int(11) not null default '0' comment '存在的表数量',
                             summary varchar(255) default null,
                             password varchar(255) default null,
                             username varchar(255) default null,
                             primary key (id),
                             key store_id (store_id),
                             constraint sdc_meta_db_ibfk_1 foreign key (store_id) references sdc_store (id) on delete cascade
) engine=innodb default charset=utf8 comment='元数据数据库信息';
create table sdc_meta_table (
                                id bigint(20) not null auto_increment,
                                create_time bigint(20) not null,
                                store_id varchar(36) not null comment '外键:存储id',
                                db_id bigint(20) not null comment '外键:dbid',
                                table_name varchar(60) not null comment '表名',
                                row_count bigint(20) not null default '0' comment '记录数量',
                                summary varchar(255) default null,
                                primary key (id),
                                key db_id (db_id),
                                key store_id (store_id),
                                constraint sdc_meta_table_ibfk_1 foreign key (db_id) references sdc_meta_db (id) on delete cascade,
                                constraint sdc_meta_table_ibfk_2 foreign key (store_id) references sdc_store (id) on delete cascade
) engine=innodb default charset=utf8 comment='元数据,表信息';
create table sdc_store (
                           id varchar(36) not null comment '存储id,主键',
                           create_time bigint(20) default null comment '创建时间毫秒值',
                           create_user varchar(36) not null,
                           description varchar(255) not null comment '描述',
                           inner_use tinyint(4) not null default '0' comment '内部使用:0_表示系统控制的内部数据,其他表示外部存储,如:自己手动加上的只作为记录',
                           type varchar(30) not null comment '存储类型:数据库、分布式文件系统等',
                           database_type tinyint(4) not null default '0' comment '数据库类型:0_derby,1_oracle,2_mysql,3_sqlserver',
                           strategy varchar(90) not null comment '存储策略的类名',
                           deleted tinyint(4) not null default '0' comment '是否废弃:1_是,0_否',
                           update_time bigint(20) default null comment '更新时间',
                           update_user varchar(36) default null comment '更新用户',
                           define_name varchar(60) not null comment '存储库定义名字,如:数据库名',
                           select_hosts varchar(100) not null comment '选择主机信息,多个用英文分号隔开',
                           primary key (id)
) engine=innodb default charset=utf8;
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoadInfo.java
New file
@@ -0,0 +1,61 @@
package com.iplatform.recvideo;
/**
 * 要获取的视频记录。
 * @author 时克英
 * @date 2022-09-26
 */
public class VideoLoadInfo {
    private String srcVideoId;      // 视频原始ID
    private String videoSrcPath;    // 在原服务器路径
    private String videoDestPath;   // 要拷贝到目的服务器路径
    private String batchId;         // 批次ID,一般是时间
    private long userId;            // 用户ID
    public long getUserId() {
        return userId;
    }
    public void setUserId(long userId) {
        this.userId = userId;
    }
    public String getSrcVideoId() {
        return srcVideoId;
    }
    public void setSrcVideoId(String srcVideoId) {
        this.srcVideoId = srcVideoId;
    }
    public String getVideoSrcPath() {
        return videoSrcPath;
    }
    public void setVideoSrcPath(String videoSrcPath) {
        this.videoSrcPath = videoSrcPath;
    }
    public String getVideoDestPath() {
        return videoDestPath;
    }
    public void setVideoDestPath(String videoDestPath) {
        this.videoDestPath = videoDestPath;
    }
    public String getBatchId() {
        return batchId;
    }
    public void setBatchId(String batchId) {
        this.batchId = batchId;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoader.java
New file
@@ -0,0 +1,99 @@
package com.iplatform.recvideo;
import com.walker.infrastructure.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public abstract class VideoLoader {
    protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
    public void startup(String videoDataFolder, String batchId, boolean testMode){
    }
    public void destroy(){
    }
    public int execute() throws Exception{
        long nextBatchId = this.acquireNextBatchId();
        if(nextBatchId == 0){
            logger.debug("未找到下一个(已有)最大批次,说明还没有操作过,默认设置一个合理值做初始查询");
            nextBatchId = this.acquireInitBatchId();
        }
        List<VideoLoadInfo> readyLoadList = null;
        try{
            readyLoadList = this.acquireLoadVideoFromDatabase(nextBatchId);
        } catch (Exception ex){
            logger.error("acquireLoadVideoFromDatabase(): " + ex.getMessage(), ex);
            return -1;
        }
        if(StringUtils.isEmptyList(readyLoadList)){
            logger.debug("未查找到任何要远程获取的视频,readyLoadList = null");
            return -1;
        }
        long savedBatchId = this.copyRemoteVideoFiles(readyLoadList, nextBatchId);
        logger.info("拷贝一批视频文件,共:" + readyLoadList.size() + "个, 返回最新 savedBatchId = " + savedBatchId);
        if(savedBatchId <= 0){
            throw new IllegalArgumentException("生成的 savedBatchId 无效,无法继续后续采集: " + savedBatchId);
        }
        try{
            this.saveDataAndStatus(readyLoadList, savedBatchId);
            return 1;
        } catch (Exception ex){
            logger.error("saveDataAndStatus(),保存采集视频数据和状态失败:" + ex.getMessage(), ex);
            return -1;
        }
    }
    /**
     * 获取最大批次采集值,通常为时间戳,系统会使用该值去查找业务中超过该值的视频信息。
     * @return
     */
    protected abstract long acquireNextBatchId();
    /**
     * 如果系统不存在已采集记录,则会自动初始化一个批次来采集,该值通常会较早
     * @return
     */
    protected abstract long acquireInitBatchId();
    /**
     * 从业务库中,检索比批次值更大(比该时间更晚)的视频记录。
     * @param nextBatchId
     * @return
     */
    protected abstract List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId);
    /**
     * 从远程服务器拷贝视频文件到本机路径中。
     * <pre>
     *     1) 视频要拷贝到指定文件根目录,系统有配置: data-folder
     *     2) 拷贝后视频文件必须用数据库ID重命名为文件名,如: 202209123.mp4
     * </pre>
     * @param readyLoadList
     * @param nextBatchId
     * @return 返回一个最新批次号,也就是当前这批数据的最大时间戳。
     */
    protected abstract long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception;
    /**
     * 保存视频批次数据,并创建新状态记录,让 <code>VideoSearchScheduler</code> 能继续完成计算相似度写入。<p></p>
     * 参考: {@linkplain com.iplatform.recvideo.scheduler.VideoSearchScheduler}
     * @param readyLoadList
     * @param savedBatchId
     * @return
     */
    protected abstract int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId);
}
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java
New file
@@ -0,0 +1,17 @@
package com.iplatform.recvideo.scheduler;
import com.iplatform.recvideo.VideoScheduler;
import com.walker.store.AbstractStore;
import com.walker.store.task.GatherTask;
public class VideoLoadScheduler extends VideoScheduler {
    public VideoLoadScheduler(int id, String name){
        super(id, name);
    }
    @Override
    protected GatherTask providerTask(AbstractStore store) {
        return null;
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/service/VideoLoaderServiceImpl.java
New file
@@ -0,0 +1,45 @@
package com.iplatform.recvideo.service;
import com.iplatform.model.po.Rc_task_status;
import com.iplatform.model.po.Rc_video_batch;
import com.iplatform.reccommon.TaskType;
import com.walker.jdbc.service.BaseServiceImpl;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
public class VideoLoaderServiceImpl extends BaseServiceImpl {
    private static final String SQL_NEXT_BATCH = "select max(last_value) next_batch_id from rc_task_status where task_type=? and (status='0' or status='1')";
    /**
     * 查找当前记录中已有的最大批次值。
     * @return
     * @date 2022-09-26
     */
    public long queryNextBatchValue(){
        Map<String, Object> nextBatchMap = this.get(SQL_NEXT_BATCH, new Object[]{TaskType.INDEX_VIDEO_LOAD});
        if(nextBatchMap == null){
            return 0;
        }
        if(nextBatchMap.get("next_batch_id") == null){
            return 0;
        }
        return Long.parseLong(nextBatchMap.get("next_batch_id").toString());
    }
    /**
     * 保存批次视频记录,并添加新的(相似度检索)状态。
     * @param videoBatchList
     * @param status
     * @return
     * @date 2022-09-26
     */
    public int execSaveBatchAndStatus(List<Rc_video_batch> videoBatchList, Rc_task_status status){
        this.save(videoBatchList);
        this.save(status);
        return videoBatchList.size();
    }
}
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
New file
@@ -0,0 +1,76 @@
package com.iplatform.recvideo.support;
import com.iplatform.model.po.Rc_task_status;
import com.iplatform.model.po.Rc_video_batch;
import com.iplatform.reccommon.TaskType;
import com.iplatform.recvideo.VideoLoadInfo;
import com.iplatform.recvideo.VideoLoader;
import com.iplatform.recvideo.service.VideoLoaderServiceImpl;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
import java.util.ArrayList;
import java.util.List;
public class DefaultVideoLoader extends VideoLoader {
    private VideoLoaderServiceImpl videoLoaderService;
    public void setVideoLoaderService(VideoLoaderServiceImpl videoLoaderService) {
        this.videoLoaderService = videoLoaderService;
    }
    @Override
    protected long acquireNextBatchId() {
        return this.videoLoaderService.queryNextBatchValue();
    }
    @Override
    protected long acquireInitBatchId() {
        return 0;
    }
    @Override
    protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) {
        return null;
    }
    @Override
    protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception {
        return 0;
    }
    @Override
    protected int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId) {
        List<Rc_video_batch> videoBatchList = this.toVideoBatchList(readyLoadList, savedBatchId);
        Rc_task_status taskStatus = this.createNewTaskStatus(savedBatchId);
        return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus);
    }
    private Rc_task_status createNewTaskStatus(long savedBatchId){
        Rc_task_status status = new Rc_task_status();
        status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow()));
        status.setStatus("0");
        status.setId(NumberGenerator.getLongSequenceNumber());
        status.setTask_type(TaskType.INDEX_VIDEO_LOAD);
        status.setLast_value(savedBatchId);
        status.setName("整理获取短视频任务");
        return status;
    }
    private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){
        List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size());
        Rc_video_batch e = null;
        String batchId = String.valueOf(savedBatchId);
        for(VideoLoadInfo v : readyLoadList){
            e = new Rc_video_batch();
            e.setBatch_id(batchId);
            e.setSrc_video_id(v.getSrcVideoId());
            e.setUser_id(v.getUserId());
            e.setSrc_video_path(v.getVideoDestPath());
            e.setId(NumberGenerator.getLongSequenceNumber());
            resultList.add(e);
        }
        return resultList;
    }
}