WangHan
2024-09-12 d5855a4926926698b740bc6c7ba489de47adb68b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package tech.powerjob.remote.http;
 
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import tech.powerjob.remote.http.vertx.VertxInitializer;
import tech.powerjob.remote.http.vertx.VertxTransporter;
 
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
 
/**
 * HttpCSInitializer
 * 在纠结了1晚上后,最终决定选用 vertx 作为 http 底层,而不是直接使用 netty,理由如下:
 *  - netty 实现容易,但性能调优方面需要时间成本和实践经验,而 vertx 作为 netty 的"嫡系"框架,对 netty 的封装理论上炉火纯青,性能不成问题
 *  - vertx 唯一的缺点是其作为相对上层的框架,可能存在较为严重的包冲突问题,尤其是对于那些本身跑在 vertx-framework 上的用户
 *      - 不过该问题可以通过更换协议解决,预计后续提供一个基于 netty 和自定义协议的实现
 *
 * 20240316 note:注意类名被强依赖,后续若有改动需要同步更改
 *
 * @author tjq
 * @since 2022/12/31
 */
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {
 
    private Vertx vertx;
    private HttpServer httpServer;
    private HttpClient httpClient;
 
    private CSInitializerConfig config;
 
    @Override
    public String type() {
        return tech.powerjob.common.enums.Protocol.HTTP.name();
    }
 
    @Override
    public void init(CSInitializerConfig config) {
        this.config = config;
 
        // 【Vertx 版本升级时必须注意】临时解决 vertx 自带的 jackson 序列化无法支持字段升级问题(默认特性居然是不支持增删字段的序列化方式,外国框架也是一坨...)
        try {
            io.vertx.core.json.jackson.DatabindCodec.mapper()
                    .configure(com.fasterxml.jackson.databind.MapperFeature.PROPAGATE_TRANSIENT_MARKER, true)
                    .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
                    .configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
                    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                    .setSerializationInclusion(JsonInclude.Include.NON_NULL);
        } catch (Throwable t) {
            log.warn("[HttpVertxCSInitializer] hack jackson failed!", t);
        }
 
        vertx = VertxInitializer.buildVertx();
        httpServer = VertxInitializer.buildHttpServer(vertx);
        httpClient = VertxInitializer.buildHttpClient(vertx);
    }
 
    @Override
    public Transporter buildTransporter() {
        return new VertxTransporter(httpClient);
    }
 
    @Override
    @SneakyThrows
    public void bindHandlers(List<ActorInfo> actorInfos) {
        Router router = Router.router(vertx);
        // 处理请求响应
        router.route().handler(BodyHandler.create());
        actorInfos.forEach(actorInfo -> {
            Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
                String handlerHttpPath = handlerInfo.getLocation().toPath();
                ProcessType processType = handlerInfo.getAnno().processType();
 
                Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
                Route route = router.post(handlerHttpPath);
                if (processType == ProcessType.BLOCKING) {
                    route.blockingHandler(routingContextHandler, false);
                } else {
                    route.handler(routingContextHandler);
                }
            });
        });
 
        // 启动 vertx http server
        final int port = config.getBindAddress().getPort();
        final String host = config.getBindAddress().getHost();
 
        httpServer.requestHandler(router)
                .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
                .listen(port, host)
                .toCompletionStage()
                .toCompletableFuture()
                .get(1, TimeUnit.MINUTES);
 
        log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
    }
 
    private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
        Method method = handlerInfo.getMethod();
        Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
 
        // 内部框架,严格模式,绑定失败直接报错
        if (!powerSerializeClz.isPresent()) {
            throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
        }
 
        return ctx -> {
            final RequestBody body = ctx.body();
            final Object convertResult = body.asPojo(powerSerializeClz.get());
            try {
                Object response = method.invoke(actorInfo.getActor(), convertResult);
                if (response != null) {
                    if (response instanceof String) {
                        ctx.end((String) response);
                    } else {
                        ctx.json(JsonObject.mapFrom(response));
                    }
                    return;
                }
 
                ctx.end();
            } catch (Throwable t) {
                // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
                log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
                ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
            }
        };
    }
 
 
    @Override
    public void close() throws IOException {
        httpClient.close();
        httpServer.close();
        vertx.close();
    }
}