WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package tech.powerjob.server.core.service;
 
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
 
import java.time.Duration;
import java.util.Optional;
 
/**
 * 本地缓存常用数据查询操作
 *
 * @author tjq
 * @since 2020/4/14
 */
@Slf4j
@Service
public class CacheService {
 
    private final JobInfoRepository jobInfoRepository;
 
    private final WorkflowInfoRepository workflowInfoRepository;
 
    private final InstanceInfoRepository instanceInfoRepository;
 
    private final Cache<Long, String> jobId2JobNameCache;
    private final Cache<Long, String> workflowId2WorkflowNameCache;
    private final Cache<Long, Long> instanceId2AppId;
    private final Cache<Long, Long> jobId2AppId;
 
    public CacheService(JobInfoRepository jobInfoRepository, WorkflowInfoRepository workflowInfoRepository, InstanceInfoRepository instanceInfoRepository) {
 
        this.jobInfoRepository = jobInfoRepository;
        this.workflowInfoRepository = workflowInfoRepository;
        this.instanceInfoRepository = instanceInfoRepository;
 
        jobId2JobNameCache = CacheBuilder.newBuilder()
                .expireAfterWrite(Duration.ofMinutes(1))
                .maximumSize(512)
                .softValues()
                .build();
 
        workflowId2WorkflowNameCache = CacheBuilder.newBuilder()
                .expireAfterWrite(Duration.ofMinutes(1))
                .maximumSize(512)
                .softValues()
                .build();
 
        instanceId2AppId = CacheBuilder.newBuilder()
                .maximumSize(1024)
                .softValues()
                .build();
        jobId2AppId = CacheBuilder.newBuilder()
                .maximumSize(1024)
                .softValues()
                .build();
    }
 
    /**
     * 根据 jobId 查询 jobName(不保证数据一致性,或者说只要改了数据必不一致hhh)
     * @param jobId 任务ID
     * @return 任务名称
     */
    public String getJobName(Long jobId) {
        try {
            return jobId2JobNameCache.get(jobId, () -> {
                Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
                // 防止缓存穿透 hhh(但是一开始没有,后来创建的情况下会有问题,不过问题不大,这里就不管了)
                return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse("");
            });
        }catch (Exception e) {
            log.error("[CacheService] getJobName for {} failed.", jobId, e);
        }
        return null;
    }
 
    /**
     * 根据 workflowId 查询 工作流名称
     * @param workflowId 工作流ID
     * @return 工作流名称
     */
    public String getWorkflowName(Long workflowId) {
        try {
            return workflowId2WorkflowNameCache.get(workflowId, () -> {
                Optional<WorkflowInfoDO> jobInfoDOOptional = workflowInfoRepository.findById(workflowId);
                // 防止缓存穿透 hhh(但是一开始没有,后来创建的情况下会有问题,不过问题不大,这里就不管了)
                return jobInfoDOOptional.map(WorkflowInfoDO::getWfName).orElse("");
            });
        }catch (Exception e) {
            log.error("[CacheService] getWorkflowName for {} failed.", workflowId, e);
        }
        return null;
    }
 
    public Long getAppIdByInstanceId(Long instanceId) {
 
        try {
            return instanceId2AppId.get(instanceId, () -> {
                // 内部记录数据库异常
                try {
                    InstanceInfoDO instanceLog = instanceInfoRepository.findByInstanceId(instanceId);
                    if (instanceLog != null) {
                        return instanceLog.getAppId();
                    }
                }catch (Exception e) {
                    log.error("[CacheService] getAppId for instanceId:{} failed.", instanceId, e);
                }
                return null;
            });
        }catch (Exception ignore) {
            // 忽略缓存 load 失败的异常
        }
        return null;
    }
 
    public Long getAppIdByJobId(Long jobId) {
        try {
            return jobId2AppId.get(jobId, () -> {
                try {
                    Optional<JobInfoDO> jobInfoDOOptional = jobInfoRepository.findById(jobId);
                    return jobInfoDOOptional.map(JobInfoDO::getAppId).orElse(null);
                }catch (Exception e) {
                    log.error("[CacheService] getAppId for job:{} failed.", jobId, e);
                }
                return null;
            });
        } catch (Exception ignore) {
        }
        return null;
    }
}