package tech.powerjob.remote.http; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationFeature; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpServer; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RequestBody; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.remote.framework.actor.ActorInfo; import tech.powerjob.remote.framework.actor.HandlerInfo; import tech.powerjob.remote.framework.actor.ProcessType; import tech.powerjob.remote.framework.cs.CSInitializer; import tech.powerjob.remote.framework.cs.CSInitializerConfig; import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.framework.utils.RemoteUtils; import tech.powerjob.remote.http.vertx.VertxInitializer; import tech.powerjob.remote.http.vertx.VertxTransporter; import java.io.IOException; import java.lang.reflect.Method; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; /** * HttpCSInitializer * 在纠结了1晚上后,最终决定选用 vertx 作为 http 底层,而不是直接使用 netty,理由如下: * - netty 实现容易,但性能调优方面需要时间成本和实践经验,而 vertx 作为 netty 的"嫡系"框架,对 netty 的封装理论上炉火纯青,性能不成问题 * - vertx 唯一的缺点是其作为相对上层的框架,可能存在较为严重的包冲突问题,尤其是对于那些本身跑在 vertx-framework 上的用户 * - 不过该问题可以通过更换协议解决,预计后续提供一个基于 netty 和自定义协议的实现 * * 20240316 note:注意类名被强依赖,后续若有改动需要同步更改 * * @author tjq * @since 2022/12/31 */ @Slf4j public class HttpVertxCSInitializer implements CSInitializer { private Vertx vertx; private HttpServer httpServer; private HttpClient httpClient; private CSInitializerConfig config; @Override public String type() { return tech.powerjob.common.enums.Protocol.HTTP.name(); } @Override public void init(CSInitializerConfig config) { this.config = config; // 【Vertx 版本升级时必须注意】临时解决 vertx 自带的 jackson 序列化无法支持字段升级问题(默认特性居然是不支持增删字段的序列化方式,外国框架也是一坨...) try { io.vertx.core.json.jackson.DatabindCodec.mapper() .configure(com.fasterxml.jackson.databind.MapperFeature.PROPAGATE_TRANSIENT_MARKER, true) .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) .configure(JsonParser.Feature.IGNORE_UNDEFINED, true) .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .setSerializationInclusion(JsonInclude.Include.NON_NULL); } catch (Throwable t) { log.warn("[HttpVertxCSInitializer] hack jackson failed!", t); } vertx = VertxInitializer.buildVertx(); httpServer = VertxInitializer.buildHttpServer(vertx); httpClient = VertxInitializer.buildHttpClient(vertx); } @Override public Transporter buildTransporter() { return new VertxTransporter(httpClient); } @Override @SneakyThrows public void bindHandlers(List actorInfos) { Router router = Router.router(vertx); // 处理请求响应 router.route().handler(BodyHandler.create()); actorInfos.forEach(actorInfo -> { Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> { String handlerHttpPath = handlerInfo.getLocation().toPath(); ProcessType processType = handlerInfo.getAnno().processType(); Handler routingContextHandler = buildRequestHandler(actorInfo, handlerInfo); Route route = router.post(handlerHttpPath); if (processType == ProcessType.BLOCKING) { route.blockingHandler(routingContextHandler, false); } else { route.handler(routingContextHandler); } }); }); // 启动 vertx http server final int port = config.getBindAddress().getPort(); final String host = config.getBindAddress().getHost(); httpServer.requestHandler(router) .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e)) .listen(port, host) .toCompletionStage() .toCompletableFuture() .get(1, TimeUnit.MINUTES); log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!"); } private Handler buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) { Method method = handlerInfo.getMethod(); Optional> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes()); // 内部框架,严格模式,绑定失败直接报错 if (!powerSerializeClz.isPresent()) { throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation()); } return ctx -> { final RequestBody body = ctx.body(); final Object convertResult = body.asPojo(powerSerializeClz.get()); try { Object response = method.invoke(actorInfo.getActor(), convertResult); if (response != null) { if (response instanceof String) { ctx.end((String) response); } else { ctx.json(JsonObject.mapFrom(response)); } return; } ctx.end(); } catch (Throwable t) { // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式 log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t); ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t); } }; } @Override public void close() throws IOException { httpClient.close(); httpServer.close(); vertx.close(); } }