WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package tech.powerjob.server.remote.server.redirector;
 
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.AskResponse;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.remote.transporter.TransportService;
import tech.powerjob.server.remote.transporter.impl.ServerURLFactory;
 
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
 
/**
 * 指定服务器运行切面
 *
 * @author tjq
 * @since 12/13/20
 */
@Slf4j
@Aspect
@Component
@Order(0)
@RequiredArgsConstructor
public class DesignateServerAspect {
 
    private final TransportService transportService;
    private final AppInfoRepository appInfoRepository;
 
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
    @Around(value = "@annotation(designateServer))")
    public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
 
        // 参数
        Object[] args = point.getArgs();
        // 方法名
        String methodName = point.getSignature().getName();
        // 类名
        String className = point.getSignature().getDeclaringTypeName();
        Signature signature = point.getSignature();
        // 方法签名
        MethodSignature methodSignature = (MethodSignature) signature;
        String[] parameterNames = methodSignature.getParameterNames();
        String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new);
 
        Long appId = null;
        for (int i = 0; i < parameterNames.length; i++) {
            if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) {
                appId = Long.parseLong(String.valueOf(args[i]));
                break;
            }
        }
 
        if (appId == null) {
            throw new PowerJobException("can't find appId in params for:" + signature);
        }
 
        // 获取执行机器
        AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info"));
        String targetServer = appInfo.getCurrentServer();
 
        // 目标IP为空,本地执行
        if (StringUtils.isEmpty(targetServer)) {
            return point.proceed();
        }
 
        // 目标IP与本地符合则本地执行
        if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) {
            return point.proceed();
        }
 
        log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer);
        // 转发请求,远程执行后返回结果
        RemoteProcessReq remoteProcessReq = new RemoteProcessReq()
                .setClassName(className)
                .setMethodName(methodName)
                .setParameterTypes(parameterTypes)
                .setArgs(args);
 
        final URL friendUrl = ServerURLFactory.process2Friend(targetServer);
 
        CompletionStage<AskResponse> askCS = transportService.ask(transportService.defaultProtocol().getProtocol(), friendUrl, remoteProcessReq, AskResponse.class);
        AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
 
        if (!askResponse.isSuccess()) {
            throw new PowerJobException("remote process failed: " + askResponse.getMessage());
        }
 
        // 考虑范型情况
        Method method = methodSignature.getMethod();
        JavaType returnType = getMethodReturnJavaType(method);
 
        return OBJECT_MAPPER.readValue(askResponse.getData(), returnType);
    }
 
 
    private static JavaType getMethodReturnJavaType(Method method) {
        Type type = method.getGenericReturnType();
        return getJavaType(type);
    }
 
    private static JavaType getJavaType(Type type) {
        if (type instanceof ParameterizedType) {
            Type[] actualTypeArguments = ((ParameterizedType)type).getActualTypeArguments();
            Class<?> rowClass = (Class<?>) ((ParameterizedType)type).getRawType();
            JavaType[] javaTypes = new JavaType[actualTypeArguments.length];
            for (int i = 0; i < actualTypeArguments.length; i++) {
                //泛型也可能带有泛型,递归处理
                javaTypes[i] = getJavaType(actualTypeArguments[i]);
            }
            return TypeFactory.defaultInstance().constructParametricType(rowClass, javaTypes);
        } else {
            return TypeFactory.defaultInstance().constructParametricType((Class<?>) type, new JavaType[0]);
        }
    }
}