| | |
| | | @JsonIgnore |
| | | protected boolean isset_score = false; |
| | | |
| | | private String batch_id = null; |
| | | @JsonIgnore |
| | | protected boolean isset_batch_id = false; |
| | | |
| | | /** |
| | | * 默认构造函数 |
| | | */ |
| | |
| | | return this.score == null; |
| | | } |
| | | |
| | | public String getBatch_id(){return this.batch_id;} |
| | | |
| | | public void setBatch_id(String batch_id){ |
| | | this.batch_id = batch_id; |
| | | this.isset_batch_id = true; |
| | | } |
| | | |
| | | @JsonIgnore |
| | | public boolean isEmptyBatch_id(){return this.batch_id == null || this.batch_id.length() == 0;} |
| | | |
| | | |
| | | /** |
| | | * 重写 toString() 方法 |
| | | */ |
| | |
| | | .append("src_video_id=").append(this.src_video_id) |
| | | .append("sim_video_id=").append(this.sim_video_id) |
| | | .append("score=").append(this.score) |
| | | .append("batch_id=").append(this.batch_id) |
| | | .toString(); |
| | | } |
| | | |
| | |
| | | if (this.isset_score) { |
| | | rc_video_t2.setScore(this.getScore()); |
| | | } |
| | | if(this.isset_batch_id){ |
| | | rc_video_t2.setBatch_id(this.getBatch_id()); |
| | | } |
| | | return rc_video_t2; |
| | | } |
| | | } |
| | |
| | | import com.walker.jdbc.sqlgen.InsertBuilder; |
| | | import com.walker.jdbc.sqlgen.SelectBuilder; |
| | | import com.walker.jdbc.sqlgen.UpdateBuilder; |
| | | import com.walker.jdbc.util.StringUtils; |
| | | |
| | | import org.springframework.jdbc.core.RowMapper; |
| | | |
| | | import java.sql.ResultSet; |
| | |
| | | public static final String SRC_VIDEO_ID = "src_video_id"; |
| | | public static final String SIM_VIDEO_ID = "sim_video_id"; |
| | | public static final String SCORE = "score"; |
| | | public static final String BATCH_ID = "batch_id"; |
| | | |
| | | /** |
| | | * 默认构造函数 |
| | |
| | | } |
| | | if (rc_video_t2.isset_score) { |
| | | this.setScore(rc_video_t2.getScore()); |
| | | } |
| | | if(rc_video_t2.isset_batch_id){ |
| | | this.setBatch_id(rc_video_t2.getBatch_id()); |
| | | } |
| | | // 去掉,2022-09-07 |
| | | // this.setDatabaseName_(rc_video_t2.getDatabaseName_()); |
| | |
| | | ib.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id); |
| | | ib.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id); |
| | | ib.set(SCORE, this.getScore(), this.isset_score); |
| | | ib.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id); |
| | | return ib.genMapSql(); |
| | | } |
| | | |
| | |
| | | ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id); |
| | | ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id); |
| | | ub.set(SCORE, this.getScore(), this.isset_score); |
| | | ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id); |
| | | ub.where(this.getPkName_(), this.getPkValue_()); |
| | | return ub.genMapSql(); |
| | | } |
| | |
| | | ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id); |
| | | ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id); |
| | | ub.set(SCORE, this.getScore(), this.isset_score); |
| | | |
| | | ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id); |
| | | return ub.genMapSql(where, parameters); |
| | | } |
| | | |
| | |
| | | ub.set(SRC_VIDEO_ID, this.getSrc_video_id(), this.isset_src_video_id); |
| | | ub.set(SIM_VIDEO_ID, this.getSim_video_id(), this.isset_sim_video_id); |
| | | ub.set(SCORE, this.getScore(), this.isset_score); |
| | | |
| | | ub.set(BATCH_ID, this.getBatch_id(), this.isset_batch_id); |
| | | return ub.genArraySql(where, parameters); |
| | | } |
| | | |
| | |
| | | */ |
| | | @Override |
| | | public SqlAndParameters<Map<String, Object>> getSelectSql_(String where, Map<String, Object> parameters) { |
| | | return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters); |
| | | return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | @Override |
| | | public SqlAndParameters<Object[]> getSelectSql_(String where, Object[] parameters) { |
| | | return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score from " + this.getTableName_() + " " + where, parameters); |
| | | return new SqlAndParameters<>("select id, src_video_id, sim_video_id, score, batch_id from " + this.getTableName_() + " " + where, parameters); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (columnIndex > 0) { |
| | | rc_video_t2.setSim_video_id(rs.getString(columnIndex)); |
| | | } |
| | | columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.BATCH_ID); |
| | | if (columnIndex > 0) { |
| | | rc_video_t2.setBatch_id(rs.getString(columnIndex)); |
| | | } |
| | | columnIndex = resultSetUtils.findColumn(rs, Rc_video_t2_mapper.SCORE); |
| | | if (columnIndex > 0) { |
| | | if (rs.getBigDecimal(columnIndex) == null) { |
| | |
| | | public static final Rc_video_user ROW_MAPPER = new Rc_video_user(); |
| | | |
| | | // 主键 |
| | | private Long id; |
| | | @JsonIgnore |
| | | protected boolean isset_id = false; |
| | | |
| | | // 属性列表 |
| | | private Long user_id = null; |
| | | @JsonIgnore |
| | | protected boolean isset_user_id = false; |
| | | |
| | | // 属性列表 |
| | | private String video_id = null; |
| | | @JsonIgnore |
| | | protected boolean isset_video_id = false; |
| | |
| | | /** |
| | | * 根据主键构造对象 |
| | | */ |
| | | public Rc_video_user(Long user_id) { |
| | | this.setUser_id(user_id); |
| | | public Rc_video_user(Long id) { |
| | | this.setId(id); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | @Override |
| | | public void setPkValue(Object value) { |
| | | this.setUser_id((Long) value); |
| | | this.setId((Long) value); |
| | | } |
| | | |
| | | public Long getId(){return this.id;} |
| | | |
| | | public void setId(Long id){ |
| | | this.id = id; |
| | | this.isset_id = true; |
| | | } |
| | | |
| | | @JsonIgnore |
| | | public boolean isEmptyId() { |
| | | return this.id == null; |
| | | } |
| | | |
| | | public Long getUser_id() { |
| | |
| | | ADD INDEX inx_src_vid (src_video_id) USING BTREE ; |
| | | |
| | | ALTER TABLE rc_video_t2 |
| | | ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ; |
| | | ADD INDEX inx_t2_src_vid (src_video_id) USING BTREE ; |
| | | ALTER TABLE rc_video_t2 |
| | | ADD INDEX inx_t2_src_bid (batch_id) USING BTREE ; |
| | | |
| | | -- ALTER TABLE rc_video_batch |
| | | -- ADD INDEX inx_vb_vid (src_video_id) USING BTREE ; |
| | | |
| | | ALTER TABLE rc_video_batch |
| | | ADD INDEX inx_vb_bid (batch_id) USING BTREE ; |
| | |
| | | } |
| | | |
| | | try { |
| | | this.processOneSearchAndWrite(); |
| | | this.processOneSearchAndWrite(this.batchId); |
| | | return 0; |
| | | } catch (Exception e) { |
| | | throw new Exception("processOneSearchAndWrite(): " + e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | private void processOneSearchAndWrite() throws Exception{ |
| | | private void processOneSearchAndWrite(String batchId) throws Exception{ |
| | | if(this.currentVideoFolderIndex >= this.videoFolderInfoList.size()){ |
| | | throw new IllegalArgumentException("currentVideoFolderIndex 越界: " + this.currentVideoFolderIndex); |
| | | } |
| | |
| | | // 每个视频的最后一张图片 |
| | | if((this.currentImageIndex + 1) >= currentVideoFolderInfo.getImageInfoSize()){ |
| | | try { |
| | | this.writeRcVideoT2(currentVideoFolderInfo); |
| | | this.writeRcVideoT2(currentVideoFolderInfo, batchId); |
| | | } catch (Exception ex){ |
| | | throw new Exception("writeRcVideoT2()执行错误:" + ex.getMessage(), ex); |
| | | } |
| | |
| | | * 分析表'rc_video_t1',并把给定视频相似记录写入第二个临时表'rc_video_t2' |
| | | * @param videoFolderInfo |
| | | */ |
| | | protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo); |
| | | protected abstract void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId); |
| | | |
| | | /** |
| | | * 分析给定批次所有视频用户推荐的视频信息,并写入表:'rc_video_user' |
| | | * @param batchId |
| | | * @param recVideoIdList 推荐视频id集合 |
| | | * @param recVideoIdList 本批次处理原始视频id集合 |
| | | */ |
| | | protected abstract void writeRcVideoUser(String batchId, List<String> recVideoIdList); |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo; |
| | | |
| | | public class SimilarVideoUser implements Comparable<SimilarVideoUser>{ |
| | | |
| | | private long userId; |
| | | private String recommendVideoId; |
| | | |
| | | private double score = 0; |
| | | |
| | | // 这个相似视频,对应的源视频图像张数 |
| | | private int count = 0; |
| | | |
| | | public SimilarVideoUser(long userId, String recommendVideoId){ |
| | | this.userId = userId; |
| | | this.recommendVideoId = recommendVideoId; |
| | | } |
| | | |
| | | public void increase(){ |
| | | this.count ++; |
| | | } |
| | | |
| | | public void setScore(double totalScore){ |
| | | this.score = totalScore; |
| | | } |
| | | |
| | | public long getUserId() { |
| | | return userId; |
| | | } |
| | | |
| | | public String getRecommendVideoId() { |
| | | return recommendVideoId; |
| | | } |
| | | |
| | | public double getScore() { |
| | | return score; |
| | | } |
| | | |
| | | @Override |
| | | public String toString(){ |
| | | return new StringBuilder("[recVideoId=").append(this.recommendVideoId) |
| | | .append(", score=").append(this.score) |
| | | .append("]").toString(); |
| | | } |
| | | |
| | | @Override |
| | | public int hashCode(){ |
| | | return this.recommendVideoId.hashCode(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean equals(Object obj){ |
| | | if(obj == null){ |
| | | return false; |
| | | } |
| | | if(obj instanceof SimilarVideoUser){ |
| | | SimilarVideoUser e = (SimilarVideoUser)obj; |
| | | if(e.recommendVideoId.equals(this.recommendVideoId)){ |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | public int compareTo(SimilarVideoUser o) { |
| | | return o.count - this.count; |
| | | } |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo; |
| | | |
| | | import com.iplatform.model.po.Rc_video_t2; |
| | | import com.walker.infrastructure.utils.StringUtils; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * 对用户推荐视频进行计算排序。 |
| | | * @author 时克英 |
| | | * @date 2022-09-26 |
| | | */ |
| | | public class SimilarVideoUserOrder { |
| | | |
| | | // private Map<String, Long> videoIdUserCache = new HashMap<>(); |
| | | |
| | | // 最终排序用的用户推荐视频集合结果, key = 推荐视频ID |
| | | private Map<String, SimilarVideoUser> orderVideoList = new HashMap<>(); |
| | | |
| | | // 计算多个推荐视频id,总得分, key = 推荐视频ID |
| | | private Map<String, List<Double>> simiVideoDistanceMap = new HashMap<>(); |
| | | |
| | | /** |
| | | * |
| | | * @param videoIdUserCache 推荐视频ID --> 用户ID 关系 |
| | | * @param list 计算后的: rc_video_t2 表数据 |
| | | */ |
| | | public SimilarVideoUserOrder(Map<String, Long> videoIdUserCache, List<Rc_video_t2> list){ |
| | | // this.videoIdUserCache = videoUser; |
| | | Long userId = null; |
| | | for(Rc_video_t2 e : list){ |
| | | this.addSimiVideoDistance(e.getSim_video_id(), e.getScore()); |
| | | userId = videoIdUserCache.get(e.getSrc_video_id()); |
| | | if(userId == null){ |
| | | throw new IllegalArgumentException("源视频未找到对应用户,src_video_id=" + e.getSrc_video_id()); |
| | | } |
| | | this.addOrderVideo(e.getSim_video_id(), userId); |
| | | } |
| | | } |
| | | |
| | | public List<SimilarVideoUser> calculateOrder(){ |
| | | SimilarVideoUser similarVideoInfo = null; |
| | | double totalScore = 0; |
| | | for(Map.Entry<String, List<Double>> entry : this.simiVideoDistanceMap.entrySet()){ |
| | | similarVideoInfo = this.orderVideoList.get(entry.getKey()); |
| | | totalScore = this.getTotalScore(entry.getValue(), entry.getKey()); |
| | | similarVideoInfo.setScore(totalScore); |
| | | } |
| | | |
| | | List<SimilarVideoUser> resultList = new ArrayList<>(128); |
| | | for(SimilarVideoUser e : this.orderVideoList.values()){ |
| | | resultList.add(e); |
| | | } |
| | | Collections.sort(resultList); |
| | | return resultList; |
| | | } |
| | | |
| | | private double getTotalScore(List<Double> list, String simiVideoId){ |
| | | if(StringUtils.isEmptyList(list)){ |
| | | return 0; |
| | | } |
| | | double total = 0; |
| | | for(double d : list){ |
| | | total += d; |
| | | } |
| | | return total; |
| | | } |
| | | |
| | | private void addSimiVideoDistance(String videoId, double score){ |
| | | List<Double> v = this.simiVideoDistanceMap.get(videoId); |
| | | if(v == null){ |
| | | v = new ArrayList<>(16); |
| | | this.simiVideoDistanceMap.put(videoId, v); |
| | | } |
| | | v.add(score); |
| | | } |
| | | |
| | | private void addOrderVideo(String simiVideoId, long userId){ |
| | | SimilarVideoUser v = this.orderVideoList.get(simiVideoId); |
| | | if(v == null){ |
| | | v = new SimilarVideoUser(userId, simiVideoId); |
| | | this.orderVideoList.put(simiVideoId, v); |
| | | } |
| | | v.increase(); |
| | | } |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo; |
| | | |
| | | import com.iplatform.gather.GatherScheduler; |
| | | import com.walker.store.support.EmptyDatabaseStore; |
| | | |
| | | public abstract class VideoScheduler extends GatherScheduler { |
| | | |
| | | protected static final String SUCCESS = "ok"; |
| | | |
| | | public VideoScheduler(int id, String name){ |
| | | super(id, name, new EmptyDatabaseStore()); |
| | | } |
| | | |
| | | protected void terminateGatherScheduler(int schedulerId) { |
| | | |
| | | } |
| | | |
| | | |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo.scheduler; |
| | | |
| | | public class VideoSearchMeta { |
| | | |
| | | private String batchId; |
| | | |
| | | public String getBatchId() { |
| | | return batchId; |
| | | } |
| | | |
| | | public VideoSearchMeta(String batchId){ |
| | | this.batchId = batchId; |
| | | } |
| | | |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo.scheduler; |
| | | |
| | | import com.iplatform.core.BeanContextAware; |
| | | import com.iplatform.gather.LocalAddress; |
| | | import com.iplatform.recvideo.VideoScheduler; |
| | | import com.walker.store.AbstractStore; |
| | | import com.walker.store.task.GatherTask; |
| | | |
| | | /** |
| | | * 视频信息存储、搜索相似度处理的调度任务。<p></p> |
| | | * 该任务会持续运行: |
| | | * <pre> |
| | | * 1) |
| | | * </pre> |
| | | * @author 时克英 |
| | | * @date 2022-09-26 |
| | | */ |
| | | public class VideoSearchScheduler extends VideoScheduler { |
| | | |
| | | private VideoSearchTask videoSearchTask = null; |
| | | private VideoSearchMeta currentVideoSearchMeta = null; |
| | | |
| | | // |
| | | private int failedCount = 0; |
| | | |
| | | public VideoSearchScheduler(int id, String name){ |
| | | super(id, name); |
| | | } |
| | | |
| | | @Override |
| | | protected GatherTask providerTask(AbstractStore store) { |
| | | LocalAddress localAddress = BeanContextAware.getBeanByType(LocalAddress.class); |
| | | VideoSearchTask task = new VideoSearchTask("视频相似度检索任务", localAddress); |
| | | return task; |
| | | } |
| | | |
| | | // @Override |
| | | // protected Object[] getRunParameters(Object previousInvokeResult) {} |
| | | |
| | | @Override |
| | | protected Object onProcess(Object[] inputParams) throws Exception { |
| | | if(this.videoSearchTask == null){ |
| | | logger.debug("调度任务初始化,需要先查询任务状态表"); |
| | | this.videoSearchTask = (VideoSearchTask)this.providerTask(this.getStore()); |
| | | Object[] params = new Object[]{this.videoSearchTask.getSql(), new Object[]{}}; |
| | | this.currentVideoSearchMeta = (VideoSearchMeta)this.videoSearchTask.run(null, null, null, params); |
| | | if(this.currentVideoSearchMeta == null){ |
| | | return null; |
| | | } |
| | | logger.debug("开始启动批次查询:" + this.currentVideoSearchMeta.getBatchId()); |
| | | } |
| | | |
| | | // |
| | | this.videoSearchTask.checkExecutor(this.currentVideoSearchMeta); |
| | | // |
| | | int result = this.videoSearchTask.executeOneSearch(); |
| | | |
| | | if(result == 0){ |
| | | return SUCCESS; |
| | | } |
| | | |
| | | if(result == -1){ |
| | | this.failedCount ++; |
| | | if(this.failedCount > 3){ |
| | | // 如果报错超过多次,返回空,让线程休眠一下 |
| | | this.failedCount = 0; |
| | | return null; |
| | | } else { |
| | | return SUCCESS; |
| | | } |
| | | } |
| | | |
| | | if(result == 1){ |
| | | // 完成,准备查找下一个批次记录 |
| | | logger.info("完成批次,准备查找下一个批次记录。batchId = " + this.currentVideoSearchMeta.getBatchId()); |
| | | this.clearStatus(); |
| | | return SUCCESS; |
| | | } |
| | | |
| | | return null; |
| | | // if(this.gatherTask == null){ |
| | | // this.gatherTask = this.providerTask(this.store); |
| | | // if(this.gatherTask == null){ |
| | | // throw new IllegalArgumentException("未提供任务对象,调度无法执行:" + this.getName()); |
| | | // } |
| | | // } |
| | | // // srcName, createTableSQL, parameter, params[] |
| | | // return this.gatherTask.run((String)inputParams[0], (String)inputParams[1], inputParams[2], (Object[])inputParams[3]); |
| | | } |
| | | |
| | | private void clearStatus(){ |
| | | if(this.videoSearchTask != null){ |
| | | this.videoSearchTask = null; |
| | | } |
| | | this.currentVideoSearchMeta = null; |
| | | } |
| | | } |
New file |
| | |
| | | package com.iplatform.recvideo.scheduler; |
| | | |
| | | import com.iplatform.core.BeanContextAware; |
| | | import com.iplatform.recvideo.config.VideoSimilarProperties; |
| | | import com.iplatform.recvideo.service.VideoExecutorServiceImpl; |
| | | import com.iplatform.recvideo.support.DefaultSimilarExecutor; |
| | | import com.walker.connector.Address; |
| | | import com.walker.infrastructure.utils.StringUtils; |
| | | import com.walker.jdbc.JdbcInspector; |
| | | import com.walker.store.task.GenericGatherTask; |
| | | import org.springframework.web.client.RestTemplate; |
| | | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * 视频相似度查询以及写入结果执行任务实现。 |
| | | * <pre> |
| | | * 1.采集源头为读取数据库最新批次信息。 |
| | | * 2.根据批次结果,扫描本机批次目录中所有视频图片,检索相似度并计算结果。 |
| | | * </pre> |
| | | * @author 时克英 |
| | | * @date 2022-09-26 |
| | | */ |
| | | public class VideoSearchTask extends GenericGatherTask { |
| | | |
| | | private DefaultSimilarExecutor similarExecutor = null; |
| | | |
| | | private VideoSimilarProperties videoSimilarProperties; |
| | | private VideoExecutorServiceImpl videoExecutorService; |
| | | private RestTemplate restTemplate; |
| | | |
| | | private String sql = "select * from rc_task_status where task_type='video_load' and status='0'"; |
| | | |
| | | public VideoSearchTask(String name, Address address){ |
| | | super(name, address); |
| | | this.setDatabaseType(JdbcInspector.getInstance().getPrimaryDatabaseType()); |
| | | this.videoSimilarProperties = BeanContextAware.getBeanByType(VideoSimilarProperties.class); |
| | | } |
| | | |
| | | @Override |
| | | protected List<Object> transferResultData(List<Object> resultList) { |
| | | return resultList; |
| | | } |
| | | |
| | | /** |
| | | * 执行具体任务处理,主要由相似度执行器执行其<code>execute</code>方法。 |
| | | * @param srcName |
| | | * @param createTableSQL |
| | | * @param parameter |
| | | * @param data 该参数是从表: rc_task_status 中查询出来的可用批次记录,若存在多个就抽取一个处理。 |
| | | * @return |
| | | */ |
| | | @Override |
| | | protected Object execute(String srcName, String createTableSQL, Object parameter, List<Object> data) { |
| | | if(StringUtils.isEmptyList(data)){ |
| | | logger.warn("数据库中检索出可处理: rc_task_status 记录,但集合为空"); |
| | | return null; |
| | | } |
| | | Map<String, Object> rcTaskStatus = (Map<String, Object>)data.get(0); |
| | | String lastValue = rcTaskStatus.get("last_value").toString(); |
| | | return new VideoSearchMeta(lastValue); |
| | | } |
| | | |
| | | public String getSql(){ |
| | | return this.sql; |
| | | } |
| | | |
| | | public void checkExecutor(VideoSearchMeta videoSearchMeta){ |
| | | if(this.similarExecutor != null){ |
| | | logger.debug("DefaultSimilarExecutor 已经设置过,不再重复设置"); |
| | | return; |
| | | } |
| | | DefaultSimilarExecutor defaultSimilarExecutor = new DefaultSimilarExecutor(); |
| | | defaultSimilarExecutor.setRemoteUrl(this.videoSimilarProperties.getAiService()); |
| | | defaultSimilarExecutor.setVideoExecutorService(BeanContextAware.getBeanByType(VideoExecutorServiceImpl.class)); |
| | | defaultSimilarExecutor.setRestTemplate(BeanContextAware.getBeanByType(RestTemplate.class)); |
| | | defaultSimilarExecutor.startup(this.videoSimilarProperties.getDataFolder() |
| | | , videoSearchMeta.getBatchId(), this.videoSimilarProperties.getTopN(), true); |
| | | this.similarExecutor = defaultSimilarExecutor; |
| | | } |
| | | |
| | | /** |
| | | * 调用一次结果查询及写入。 |
| | | * @return -1 失败出现异常, 0 成功执行一次(继续下一次), 1 批次完成 |
| | | * @throws Exception |
| | | */ |
| | | public int executeOneSearch() throws Exception{ |
| | | if(this.similarExecutor == null){ |
| | | throw new IllegalStateException("similarExecutor is required!"); |
| | | } |
| | | return this.similarExecutor.execute(); |
| | | } |
| | | } |
| | |
| | | |
| | | import com.iplatform.model.po.Rc_video_t1; |
| | | import com.iplatform.model.po.Rc_video_t2; |
| | | import com.iplatform.model.po.Rc_video_user; |
| | | import com.walker.infrastructure.utils.DateUtils; |
| | | import com.walker.infrastructure.utils.StringUtils; |
| | | import com.walker.jdbc.service.BaseServiceImpl; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | |
| | | private static final String SQL_CHECK_VIDEO_STATUS = "select * from milvus_video_status where id=?"; |
| | | private static final String SQL_CLEAR_VIDEO_T1 = "delete from rc_video_t1 where src_img=?"; |
| | | private static final String SQL_CLEAR_VIDEO_T2 = "delete from rc_video_t2 where src_video_id=?"; |
| | | |
| | | private static final String SQL_GET_BATCH_VIDEO = "select user_id, src_video_id from rc_video_batch where batch_id=?"; |
| | | |
| | | private static final String SQL_UPDATE_TASK_STATUS_LOAD = "update rc_task_status set status='1', end_time=? where last_value=? and status='0'"; |
| | | |
| | | /** |
| | | * 写入视频相似度第一级临时数据,每个图像包含多个相似视频记录。 |
| | |
| | | } |
| | | |
| | | /** |
| | | * 写入用户推荐视频记录集合,并更新批次任务状态为(已完成) |
| | | * @param videoUserList |
| | | * @param batchId |
| | | */ |
| | | public void execBatchInsertVideoUser(List<Rc_video_user> videoUserList, String batchId){ |
| | | this.insert(videoUserList); |
| | | Object[] param = new Object[2]; |
| | | param[0] = Long.parseLong(DateUtils.getDateTimeSecondForShow()); |
| | | param[1] = Long.parseLong(batchId); |
| | | this.execute(SQL_UPDATE_TASK_STATUS_LOAD, param); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 返回一个批次用户对应视频记录集合,用于最后更新用户推荐视频数据。 |
| | | * @param batchId |
| | | * @return |
| | | * @date 2022-09-26 |
| | | */ |
| | | public List<Map<String, Object>> queryBatchUserVideoList(String batchId){ |
| | | return this.select(SQL_GET_BATCH_VIDEO, new Object[]{batchId}); |
| | | } |
| | | |
| | | public List<Rc_video_t2> queryVideoT_2List(String batchId){ |
| | | return this.select(new Rc_video_t2(), "where batch_id=?", new Object[]{batchId}); |
| | | } |
| | | |
| | | /** |
| | | * 根据原始视频ID返回相似记录集合。 |
| | | * @param srcVideoId |
| | | * @return |
| | |
| | | |
| | | import com.iplatform.model.po.Rc_video_t1; |
| | | import com.iplatform.model.po.Rc_video_t2; |
| | | import com.iplatform.model.po.Rc_video_user; |
| | | import com.iplatform.recvideo.Constants; |
| | | import com.iplatform.recvideo.SimilarExecutor; |
| | | import com.iplatform.recvideo.SimilarVideoInfo; |
| | | import com.iplatform.recvideo.SimilarVideoOrder; |
| | | import com.iplatform.recvideo.SimilarVideoUser; |
| | | import com.iplatform.recvideo.SimilarVideoUserOrder; |
| | | import com.iplatform.recvideo.VideoFolderInfo; |
| | | import com.iplatform.recvideo.service.VideoExecutorServiceImpl; |
| | | import com.iplatform.recvideo.util.PythonInvokeUtils; |
| | | import com.walker.infrastructure.utils.DateUtils; |
| | | import com.walker.infrastructure.utils.NumberGenerator; |
| | | import com.walker.infrastructure.utils.StringUtils; |
| | | import org.springframework.web.client.RestTemplate; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | public class DefaultSimilarExecutor extends SimilarExecutor { |
| | | |
| | |
| | | } |
| | | |
| | | @Override |
| | | protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo) { |
| | | protected void writeRcVideoT2(VideoFolderInfo videoFolderInfo, String batchId) { |
| | | logger.debug("正在写入'Rc_video_t2',原始视频:" + videoFolderInfo.getVideoId()); |
| | | List<Rc_video_t1> list = this.videoExecutorService.queryVideoT_1List(videoFolderInfo.getVideoId()); |
| | | if(StringUtils.isEmptyList(list)){ |
| | |
| | | } |
| | | SimilarVideoOrder similarVideoOrder = new SimilarVideoOrder(list); |
| | | List<SimilarVideoInfo> similarVideoInfoList = similarVideoOrder.calculateOrder(); |
| | | List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList); |
| | | List<Rc_video_t2> video2List = this.toRcVideoT2List(videoFolderInfo.getVideoId(), similarVideoInfoList, batchId); |
| | | if(video2List == null){ |
| | | logger.warn("writeRcVideoT2(): similarVideoInfoList为空,不能执行, srcVideoId = " + videoFolderInfo.getVideoId()); |
| | | return; |
| | |
| | | @Override |
| | | protected void writeRcVideoUser(String batchId, List<String> recVideoIdList) { |
| | | logger.info("正在写入一次用户推荐视频数据, batchId = " + batchId); |
| | | List<Map<String, Object>> batchUserVideoList = this.videoExecutorService.queryBatchUserVideoList(batchId); |
| | | if(StringUtils.isEmptyList(batchUserVideoList)){ |
| | | logger.error("未找到批次用户视频记录,无法更新用户推荐视频数据,batchId = " + batchId); |
| | | return; |
| | | } |
| | | List<Rc_video_t2> video2List = this.videoExecutorService.queryVideoT_2List(batchId); |
| | | if(StringUtils.isEmptyList(video2List)){ |
| | | logger.warn("从表'rc_video_t2'未加载到批次视频结果记录,无法继续写入用户推荐视频'rc_video_user'"); |
| | | return; |
| | | } |
| | | |
| | | // 原始视频ID与用户关系 |
| | | Map<String, Long> srcVideoIdUserCache = new HashMap<>(); |
| | | for(Map<String, Object> m : batchUserVideoList){ |
| | | srcVideoIdUserCache.put(m.get("src_video_id").toString(), Long.parseLong(m.get("user_id").toString())); |
| | | } |
| | | |
| | | // 推荐视频ID与用户关系 |
| | | Map<String, Long> videoIdUserCache = new HashMap<>(); |
| | | Long userId = null; |
| | | for(Rc_video_t2 e : video2List){ |
| | | userId = srcVideoIdUserCache.get(e.getSrc_video_id()); |
| | | if(userId == null){ |
| | | throw new IllegalArgumentException("推荐视频的原始视频未找到对应用户,src_video = " + e.getSrc_video_id() + ", rec_video = " + e.getSim_video_id()); |
| | | } |
| | | videoIdUserCache.put(e.getSim_video_id(), userId); |
| | | } |
| | | |
| | | SimilarVideoUserOrder similarVideoUserOrder = new SimilarVideoUserOrder(videoIdUserCache, video2List); |
| | | List<SimilarVideoUser> similarVideoUserList = similarVideoUserOrder.calculateOrder(); |
| | | List<Rc_video_user> videoUserList = this.toRcVideoUserList(similarVideoUserList); |
| | | if(videoUserList == null){ |
| | | logger.warn("writeRcVideoUser(): similarVideoUserList,不能执行, batchId = " + batchId); |
| | | return; |
| | | } |
| | | |
| | | // 用户推荐视频记录,需要更新,如果不存在才写入。 |
| | | // 注意:这里暂时不更新,全部写入,这样用户推荐表中会存在较多重复数据,后续要添加定时任务清除重复数据即可。 |
| | | this.videoExecutorService.execBatchInsertVideoUser(videoUserList, batchId); |
| | | logger.debug("批次任务状态已更新成功! " + batchId); |
| | | } |
| | | |
| | | private List<Rc_video_t2> toRcVideoT2List(String srcVideoId, List<SimilarVideoInfo> similarVideoInfoList){ |
| | | private List<Rc_video_user> toRcVideoUserList(List<SimilarVideoUser> similarVideoUserList){ |
| | | if(StringUtils.isEmptyList(similarVideoUserList)){ |
| | | return null; |
| | | } |
| | | List<Rc_video_user> resultList = new ArrayList<>(similarVideoUserList.size()); |
| | | Rc_video_user rc_video_t2 = null; |
| | | for(SimilarVideoUser e : similarVideoUserList){ |
| | | rc_video_t2 = new Rc_video_user(NumberGenerator.getLongSequenceNumber()); |
| | | rc_video_t2.setCreate_time(Long.parseLong(DateUtils.getDateTimeSecondForShow())); |
| | | rc_video_t2.setUser_id(e.getUserId()); |
| | | rc_video_t2.setVideo_id(e.getRecommendVideoId()); |
| | | rc_video_t2.setScore(e.getScore()); |
| | | resultList.add(rc_video_t2); |
| | | } |
| | | return resultList; |
| | | } |
| | | |
| | | private List<Rc_video_t2> toRcVideoT2List(String srcVideoId |
| | | , List<SimilarVideoInfo> similarVideoInfoList, String batchId){ |
| | | if(StringUtils.isEmptyList(similarVideoInfoList)){ |
| | | return null; |
| | | } |
| | |
| | | rc_video_t2.setSrc_video_id(srcVideoId); |
| | | rc_video_t2.setSim_video_id(e.getId()); |
| | | rc_video_t2.setScore(e.getScore()); |
| | | rc_video_t2.setBatch_id(batchId); |
| | | resultList.add(rc_video_t2); |
| | | } |
| | | return resultList; |