recommend-video/doc/table.SQL | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoadInfo.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/VideoLoader.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/service/VideoLoaderServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
recommend-video/doc/table.SQL
@@ -1,3 +1,6 @@ -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -- 以下为索引设置 -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ALTER TABLE rc_video_t1 ADD INDEX inx_src_img (src_img) USING BTREE ; @@ -17,4 +20,190 @@ ADD INDEX inx_vb_bid (batch_id) USING BTREE ; ALTER TABLE rc_video_user ADD INDEX inx_vu_uid (user_id) USING BTREE ; ADD INDEX inx_vu_uid (user_id) USING BTREE ; -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -- 以下为表结构 -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ create table milvus_video_batch ( batch_id bigint(20) not null comment '批次id,即:日期时间', video_id varchar(255) not null, video_path varchar(255) default null, vector_size int(11) not null default '0', create_time bigint(20) not null, status int(11) not null default '1' comment '状态:0 未完成,1完成' ) engine=innodb default charset=utf8; create table milvus_video_search ( milvus_id text, image_path text, video_id text ) engine=innodb default charset=utf8; create table milvus_video_status ( id varchar(255) not null comment '视频路径', create_time bigint(20) not null, status int(11) not null default '0' comment '完成状态: 0 未完成, 1 完成', records bigint(20) not null default '0', primary key (id) ) engine=innodb default charset=utf8; create table rc_task_status ( id bigint(20) not null comment '主键', create_time bigint(20) not null comment '创建时间,如: 20220922180501', name varchar(180) not null comment '任务名称', last_value bigint(20) not null comment '获取数据的最大值', status varchar(32) not null comment '当前状态,见枚举: taskstatus', start_time bigint(20) default null, end_time bigint(20) default null, msg varchar(180) default null comment '执行描述', task_type varchar(32) not null comment '任务类型,见枚举: tasktype', primary key (id) ) engine=innodb default charset=utf8; create table rc_video_batch ( id bigint(20) not null, batch_id varchar(32) not null comment '批次id', user_id bigint(20) not null, src_video_id varchar(32) not null, src_video_path varchar(255) default null, primary key (id), key inx_vb_bid (batch_id) using btree ) engine=innodb default charset=utf8; create table rc_video_t1 ( src_img varchar(36) not null comment '图像id', src_video_id varchar(32) not null, sim_video_id varchar(32) not null, distance double not null default '0', id varchar(32) not null, primary key (id), key inx_src_img (src_img) using btree, key inx_src_vid (src_video_id) using btree ) engine=innodb default charset=utf8; create table rc_video_t2 ( src_video_id varchar(32) not null, sim_video_id varchar(32) not null, score double not null default '0' comment '相似视频综合得分', id varchar(32) not null, batch_id varchar(32) not null default '0', primary key (id), key inx_t2_src_vid (src_video_id) using btree, key inx_t2_src_bid (batch_id) using btree ) engine=innodb default charset=utf8; create table rc_video_user ( user_id bigint(20) not null, video_id varchar(32) not null, score double not null default '0', create_time bigint(20) not null default '0', id bigint(20) not null, primary key (id), key inx_vu_uid (user_id) using btree ) engine=innodb default charset=utf8; create table s_host ( id bigint(20) not null auto_increment, create_time bigint(20) not null, create_user varchar(36) not null, url varchar(200) not null comment '库地址', port int(11) not null default '1306' comment '端口', service_name varchar(100) default null comment '服务名称', authentication varchar(30) default null comment '认证用户', certification varchar(30) default null comment '授权密码', max_active int(11) not null default '5' comment '最大连接数', primary key (id) ) engine=innodb auto_increment=105 default charset=utf8 comment='存储资源可用主机'; create table s_scheduler ( id int(11) not null comment '调度器id', create_time bigint(20) not null, name varchar(180) not null, status int(11) not null default '0' comment '状态:0_初始化,1_运行,2_暂停,9_结束,-1_人工终止', start_time bigint(20) not null default '0' comment '调度开始运行时间', end_time bigint(20) not null default '0' comment '调度结束运行时间', interval_time int(11) not null default '5000' comment '内部线程执行间隔时间,默认:5秒', sleep_time int(11) not null default '600000' comment '线程执行中,睡眠时间,默认:10分钟', sleep_option int(11) not null default '1' comment '采集线程在没有获取数据时,是否进入休眠:0_否,1_是', period_type varchar(36) not null comment '周期类型:none,day,week,month,year', time_type varchar(36) not null comment '定时类型:精确时间_exactly,时间段_range', year int(11) not null default '0', month int(11) not null default '0', day int(11) not null default '0', hour int(11) not null default '0', ranges varchar(50) default null comment '时间范围,多个如:5,6;12,15;...', dept int(11) not null default '0', class_name varchar(200) not null, pause_time bigint(20) not null default '0', summary varchar(255) default null, primary key (id) ) engine=innodb default charset=utf8 comment='平台调度器记录表'; create table sdc_gather ( id bigint(20) not null, create_time bigint(20) not null, create_user varchar(36) not null, name varchar(120) not null comment '采集名称', description varchar(255) default null, src_type tinyint(4) not null default '0' comment '采集源:0_http,1_db', src_url varchar(120) not null, src_port int(11) default '80', src_service varchar(120) default null, src_user varchar(120) default null, src_pass varchar(120) default null, store_id varchar(36) not null comment '外键:存储id', status tinyint(4) not null default '0' comment '状态:0_初始化,1_运行,2_暂停,9_结束', schedule_id int(11) not null comment '调度器id', primary key (id) ) engine=innodb default charset=utf8 comment='采集任务记录'; create table sdc_meta_db ( id bigint(20) not null, create_time bigint(20) not null, store_id varchar(36) not null comment '外键:存储id', database_name varchar(60) default null, used tinyint(4) not null default '0' comment '是否被使用过:0_否,1_是', is_using tinyint(4) not null default '0' comment '正在被使用:0_否,1_是', host_info varchar(100) not null comment '存储使用库的信息:ip:port', table_count int(11) not null default '0' comment '存在的表数量', summary varchar(255) default null, password varchar(255) default null, username varchar(255) default null, primary key (id), key store_id (store_id), constraint sdc_meta_db_ibfk_1 foreign key (store_id) references sdc_store (id) on delete cascade ) engine=innodb default charset=utf8 comment='元数据数据库信息'; create table sdc_meta_table ( id bigint(20) not null auto_increment, create_time bigint(20) not null, store_id varchar(36) not null comment '外键:存储id', db_id bigint(20) not null comment '外键:dbid', table_name varchar(60) not null comment '表名', row_count bigint(20) not null default '0' comment '记录数量', summary varchar(255) default null, primary key (id), key db_id (db_id), key store_id (store_id), constraint sdc_meta_table_ibfk_1 foreign key (db_id) references sdc_meta_db (id) on delete cascade, constraint sdc_meta_table_ibfk_2 foreign key (store_id) references sdc_store (id) on delete cascade ) engine=innodb default charset=utf8 comment='元数据,表信息'; create table sdc_store ( id varchar(36) not null comment '存储id,主键', create_time bigint(20) default null comment '创建时间毫秒值', create_user varchar(36) not null, description varchar(255) not null comment '描述', inner_use tinyint(4) not null default '0' comment '内部使用:0_表示系统控制的内部数据,其他表示外部存储,如:自己手动加上的只作为记录', type varchar(30) not null comment '存储类型:数据库、分布式文件系统等', database_type tinyint(4) not null default '0' comment '数据库类型:0_derby,1_oracle,2_mysql,3_sqlserver', strategy varchar(90) not null comment '存储策略的类名', deleted tinyint(4) not null default '0' comment '是否废弃:1_是,0_否', update_time bigint(20) default null comment '更新时间', update_user varchar(36) default null comment '更新用户', define_name varchar(60) not null comment '存储库定义名字,如:数据库名', select_hosts varchar(100) not null comment '选择主机信息,多个用英文分号隔开', primary key (id) ) engine=innodb default charset=utf8; recommend-video/src/main/java/com/iplatform/recvideo/VideoLoadInfo.java
New file @@ -0,0 +1,61 @@ package com.iplatform.recvideo; /** * 要获取的视频记录。 * @author 时克英 * @date 2022-09-26 */ public class VideoLoadInfo { private String srcVideoId; // 视频原始ID private String videoSrcPath; // 在原服务器路径 private String videoDestPath; // 要拷贝到目的服务器路径 private String batchId; // 批次ID,一般是时间 private long userId; // 用户ID public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public String getSrcVideoId() { return srcVideoId; } public void setSrcVideoId(String srcVideoId) { this.srcVideoId = srcVideoId; } public String getVideoSrcPath() { return videoSrcPath; } public void setVideoSrcPath(String videoSrcPath) { this.videoSrcPath = videoSrcPath; } public String getVideoDestPath() { return videoDestPath; } public void setVideoDestPath(String videoDestPath) { this.videoDestPath = videoDestPath; } public String getBatchId() { return batchId; } public void setBatchId(String batchId) { this.batchId = batchId; } } recommend-video/src/main/java/com/iplatform/recvideo/VideoLoader.java
New file @@ -0,0 +1,99 @@ package com.iplatform.recvideo; import com.walker.infrastructure.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public abstract class VideoLoader { protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); public void startup(String videoDataFolder, String batchId, boolean testMode){ } public void destroy(){ } public int execute() throws Exception{ long nextBatchId = this.acquireNextBatchId(); if(nextBatchId == 0){ logger.debug("未找到下一个(已有)最大批次,说明还没有操作过,默认设置一个合理值做初始查询"); nextBatchId = this.acquireInitBatchId(); } List<VideoLoadInfo> readyLoadList = null; try{ readyLoadList = this.acquireLoadVideoFromDatabase(nextBatchId); } catch (Exception ex){ logger.error("acquireLoadVideoFromDatabase(): " + ex.getMessage(), ex); return -1; } if(StringUtils.isEmptyList(readyLoadList)){ logger.debug("未查找到任何要远程获取的视频,readyLoadList = null"); return -1; } long savedBatchId = this.copyRemoteVideoFiles(readyLoadList, nextBatchId); logger.info("拷贝一批视频文件,共:" + readyLoadList.size() + "个, 返回最新 savedBatchId = " + savedBatchId); if(savedBatchId <= 0){ throw new IllegalArgumentException("生成的 savedBatchId 无效,无法继续后续采集: " + savedBatchId); } try{ this.saveDataAndStatus(readyLoadList, savedBatchId); return 1; } catch (Exception ex){ logger.error("saveDataAndStatus(),保存采集视频数据和状态失败:" + ex.getMessage(), ex); return -1; } } /** * 获取最大批次采集值,通常为时间戳,系统会使用该值去查找业务中超过该值的视频信息。 * @return */ protected abstract long acquireNextBatchId(); /** * 如果系统不存在已采集记录,则会自动初始化一个批次来采集,该值通常会较早 * @return */ protected abstract long acquireInitBatchId(); /** * 从业务库中,检索比批次值更大(比该时间更晚)的视频记录。 * @param nextBatchId * @return */ protected abstract List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId); /** * 从远程服务器拷贝视频文件到本机路径中。 * <pre> * 1) 视频要拷贝到指定文件根目录,系统有配置: data-folder * 2) 拷贝后视频文件必须用数据库ID重命名为文件名,如: 202209123.mp4 * </pre> * @param readyLoadList * @param nextBatchId * @return 返回一个最新批次号,也就是当前这批数据的最大时间戳。 */ protected abstract long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception; /** * 保存视频批次数据,并创建新状态记录,让 <code>VideoSearchScheduler</code> 能继续完成计算相似度写入。<p></p> * 参考: {@linkplain com.iplatform.recvideo.scheduler.VideoSearchScheduler} * @param readyLoadList * @param savedBatchId * @return */ protected abstract int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId); } recommend-video/src/main/java/com/iplatform/recvideo/scheduler/VideoLoadScheduler.java
New file @@ -0,0 +1,17 @@ package com.iplatform.recvideo.scheduler; import com.iplatform.recvideo.VideoScheduler; import com.walker.store.AbstractStore; import com.walker.store.task.GatherTask; public class VideoLoadScheduler extends VideoScheduler { public VideoLoadScheduler(int id, String name){ super(id, name); } @Override protected GatherTask providerTask(AbstractStore store) { return null; } } recommend-video/src/main/java/com/iplatform/recvideo/service/VideoLoaderServiceImpl.java
New file @@ -0,0 +1,45 @@ package com.iplatform.recvideo.service; import com.iplatform.model.po.Rc_task_status; import com.iplatform.model.po.Rc_video_batch; import com.iplatform.reccommon.TaskType; import com.walker.jdbc.service.BaseServiceImpl; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; @Service public class VideoLoaderServiceImpl extends BaseServiceImpl { private static final String SQL_NEXT_BATCH = "select max(last_value) next_batch_id from rc_task_status where task_type=? and (status='0' or status='1')"; /** * 查找当前记录中已有的最大批次值。 * @return * @date 2022-09-26 */ public long queryNextBatchValue(){ Map<String, Object> nextBatchMap = this.get(SQL_NEXT_BATCH, new Object[]{TaskType.INDEX_VIDEO_LOAD}); if(nextBatchMap == null){ return 0; } if(nextBatchMap.get("next_batch_id") == null){ return 0; } return Long.parseLong(nextBatchMap.get("next_batch_id").toString()); } /** * 保存批次视频记录,并添加新的(相似度检索)状态。 * @param videoBatchList * @param status * @return * @date 2022-09-26 */ public int execSaveBatchAndStatus(List<Rc_video_batch> videoBatchList, Rc_task_status status){ this.save(videoBatchList); this.save(status); return videoBatchList.size(); } } recommend-video/src/main/java/com/iplatform/recvideo/support/DefaultVideoLoader.java
New file @@ -0,0 +1,76 @@ package com.iplatform.recvideo.support; import com.iplatform.model.po.Rc_task_status; import com.iplatform.model.po.Rc_video_batch; import com.iplatform.reccommon.TaskType; import com.iplatform.recvideo.VideoLoadInfo; import com.iplatform.recvideo.VideoLoader; import com.iplatform.recvideo.service.VideoLoaderServiceImpl; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; import java.util.ArrayList; import java.util.List; public class DefaultVideoLoader extends VideoLoader { private VideoLoaderServiceImpl videoLoaderService; public void setVideoLoaderService(VideoLoaderServiceImpl videoLoaderService) { this.videoLoaderService = videoLoaderService; } @Override protected long acquireNextBatchId() { return this.videoLoaderService.queryNextBatchValue(); } @Override protected long acquireInitBatchId() { return 0; } @Override protected List<VideoLoadInfo> acquireLoadVideoFromDatabase(long nextBatchId) { return null; } @Override protected long copyRemoteVideoFiles(List<VideoLoadInfo> readyLoadList, long nextBatchId) throws Exception { return 0; } @Override protected int saveDataAndStatus(List<VideoLoadInfo> readyLoadList, long savedBatchId) { List<Rc_video_batch> videoBatchList = this.toVideoBatchList(readyLoadList, savedBatchId); Rc_task_status taskStatus = this.createNewTaskStatus(savedBatchId); return this.videoLoaderService.execSaveBatchAndStatus(videoBatchList, taskStatus); } private Rc_task_status createNewTaskStatus(long savedBatchId){ Rc_task_status status = new Rc_task_status(); status.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); status.setStatus("0"); status.setId(NumberGenerator.getLongSequenceNumber()); status.setTask_type(TaskType.INDEX_VIDEO_LOAD); status.setLast_value(savedBatchId); status.setName("整理获取短视频任务"); return status; } private List<Rc_video_batch> toVideoBatchList(List<VideoLoadInfo> readyLoadList, long savedBatchId){ List<Rc_video_batch> resultList = new ArrayList<>(readyLoadList.size()); Rc_video_batch e = null; String batchId = String.valueOf(savedBatchId); for(VideoLoadInfo v : readyLoadList){ e = new Rc_video_batch(); e.setBatch_id(batchId); e.setSrc_video_id(v.getSrcVideoId()); e.setUser_id(v.getUserId()); e.setSrc_video_path(v.getVideoDestPath()); e.setId(NumberGenerator.getLongSequenceNumber()); resultList.add(e); } return resultList; } }