WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package tech.powerjob.server.core.lock;
 
import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.server.common.utils.AOPUtils;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.lock.SlowLockEvent;
 
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
 
/**
 * aspect for @UseSegmentLock
 *
 * @author tjq
 * @since 1/16/21
 */
@Slf4j
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {
 
    private final MonitorService monitorService;
 
    private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
 
    private static final long SLOW_THRESHOLD = 100;
 
    @Around(value = "@annotation(useCacheLock))")
    public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {
        Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {
            int concurrencyLevel = useCacheLock.concurrencyLevel();
            log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);
            return CacheBuilder.newBuilder()
                    .initialCapacity(300000)
                    .maximumSize(500000)
                    .concurrencyLevel(concurrencyLevel)
                    .expireAfterWrite(30, TimeUnit.MINUTES)
                    .build();
        });
        final Method method = AOPUtils.parseMethod(point);
        Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);
        final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
        long start = System.currentTimeMillis();
        reentrantLock.lockInterruptibly();
        try {
            long timeCost = System.currentTimeMillis() - start;
            if (timeCost > SLOW_THRESHOLD) {
 
                final SlowLockEvent slowLockEvent = new SlowLockEvent()
                        .setType(SlowLockEvent.Type.LOCAL)
                        .setLockType(useCacheLock.type())
                        .setLockKey(String.valueOf(key))
                        .setCallerService(method.getDeclaringClass().getSimpleName())
                        .setCallerMethod(method.getName())
                        .setCost(timeCost);
 
                monitorService.monitor(slowLockEvent);
 
                log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,
                        key,
                        JSON.toJSONString(point.getArgs()));
            }
            return point.proceed();
        } finally {
            reentrantLock.unlock();
        }
    }
}