package tech.powerjob.server.persistence.remote.repository; import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo; import javax.transaction.Transactional; import java.util.Date; import java.util.List; /** * JobLog 数据访问层 * * @author tjq * @since 2020/4/1 */ public interface InstanceInfoRepository extends JpaRepository, JpaSpecificationExecutor { /** * 统计当前JOB有多少实例正在运行 */ @Query(value = "select count(*) from InstanceInfoDO where jobId = ?1 and status in ?2") long countByJobIdAndStatusIn(long jobId, List status); List findByJobIdAndStatusIn(long jobId, List status); /** * 更新状态变更信息 * * @param lastReportTime 最近一次上报时间 * @param modifyTime 更新时间 * @param runningTimes 运行次数 * @param instanceId 实例 ID * @param status 目标状态 * @param oldStatus 旧状态 * @return 更新记录数 */ @Transactional(rollbackOn = Exception.class) @Modifying @Query(value = "update InstanceInfoDO set lastReportTime = :lastReportTime, gmtModified = :modifyTime, runningTimes = :runningTimes, status = :status where instanceId = :instanceId and status = :oldStatus") int updateStatusChangeInfoByInstanceIdAndStatus(@Param("lastReportTime") long lastReportTime, @Param("modifyTime") Date modifyTime, @Param("runningTimes") long runningTimes, @Param("status") int status, @Param("instanceId") long instanceId, @Param("oldStatus") int oldStatus); /** * 更新任务执行记录内容(DispatchService专用) * * @param instanceId 实例 ID * @param status 状态 * @param actualTriggerTime 实际调度时间 * @param finishedTime 完成时间 * @param taskTrackerAddress taskTracker 地址 * @param result 结果 * @param modifyTime 更新时间 * @return 更新记录数量 */ @Transactional(rollbackOn = Exception.class) @Modifying @CanIgnoreReturnValue @Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, finishedTime = :finishedTime, taskTrackerAddress = :taskTrackerAddress, result = :result, gmtModified = :modifyTime where instanceId = :instanceId") int update4TriggerFailed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("finishedTime") long finishedTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("result") String result, @Param("modifyTime") Date modifyTime); /** * 更新任务执行记录内容(DispatchService专用) * * @param instanceId 任务实例ID,分布式唯一 * @param status 状态 * @param actualTriggerTime 实际调度时间 * @param taskTrackerAddress taskTracker 地址 * @param modifyTime 更新时间 * @param oldStatus 旧状态 * @return 更新记录数量 */ @Transactional(rollbackOn = Exception.class) @Modifying @CanIgnoreReturnValue @Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, taskTrackerAddress = :taskTrackerAddress, gmtModified = :modifyTime where instanceId = :instanceId and status = :oldStatus") int update4TriggerSucceed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("modifyTime") Date modifyTime, @Param("oldStatus") int oldStatus); @Transactional(rollbackOn = Exception.class) @Modifying @CanIgnoreReturnValue @Query(value = "update InstanceInfoDO set status = :status, gmtModified = :modifyTime where instanceId = :instanceId and status = :originStatus ") int updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(@Param("instanceId") long instanceId, @Param("originStatus") int originStatus, @Param("status") int status, @Param("modifyTime") Date modifyTime); @Transactional(rollbackOn = Exception.class) @Modifying @CanIgnoreReturnValue @Query(value = "update InstanceInfoDO set status = :status, gmtModified = :modifyTime where instanceId in (:instanceIdList) and status = :originStatus ") int updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(@Param("instanceIdList") List instanceIdList, @Param("originStatus") int originStatus, @Param("status") int status, @Param("modifyTime") Date modifyTime); /** * 更新固定频率任务的执行记录 * * @param instanceId 任务实例ID,分布式唯一 * @param status 状态 * @param runningTimes 执行次数 * @param modifyTime 更新时间 * @return 更新记录数量 */ @Modifying @Transactional(rollbackOn = Exception.class) @CanIgnoreReturnValue @Query(value = "update InstanceInfoDO set status = :status, runningTimes = :runningTimes, gmtModified = :modifyTime where instanceId = :instanceId") int update4FrequentJob(@Param("instanceId") long instanceId, @Param("status") int status, @Param("runningTimes") long runningTimes, @Param("modifyTime") Date modifyTime); List findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(@Param("appIds") List appIds, @Param("status") int status, @Param("time") long time, Pageable pageable); @Query(value = "select new tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo(i.appId,i.id,i.jobId,i.instanceId) from InstanceInfoDO i where i.appId in (:appIds) and i.status = :status and i.actualTriggerTime < :time") List selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(@Param("appIds") List appIds, @Param("status") int status, @Param("time") long time, Pageable pageable); @Query(value = "select new tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo(i.appId,i.id,i.jobId,i.instanceId,i.runningTimes) from InstanceInfoDO i where i.appId in (:appIds) and i.status = :status and i.gmtModified < :time") List selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(@Param("appIds") List appIds, @Param("status") int status, @Param("time") Date time, Pageable pageable); InstanceInfoDO findByInstanceId(long instanceId); /* --数据统计-- */ @Query(value = "select count(*) from InstanceInfoDO where appId = ?1 and status = ?2") long countByAppIdAndStatus(long appId, int status); long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time); @Query(value = "select distinct jobId from InstanceInfoDO where jobId in ?1 and status in ?2") List findByJobIdInAndStatusIn(List jobIds, List status); /** * 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... * 结果只能用 int 接收 * * @param time 更新时间阈值 * @param status 状态 * @return 删除记录数 */ @Modifying @Transactional(rollbackOn = Exception.class) @Query(value = "delete from InstanceInfoDO where gmtModified < ?1 and status in ?2") int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List status); }