package tech.powerjob.remote.http.vertx; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Future; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.RequestOptions; import io.vertx.core.json.JsonObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import tech.powerjob.common.PowerSerializable; import tech.powerjob.remote.framework.base.RemotingException; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.remote.framework.transporter.Protocol; import tech.powerjob.remote.framework.transporter.Transporter; import tech.powerjob.remote.http.HttpProtocol; import java.util.concurrent.CompletionStage; /** * VertxTransporter * * @author tjq * @since 2023/1/1 */ @Slf4j public class VertxTransporter implements Transporter { private final HttpClient httpClient; private static final Protocol PROTOCOL = new HttpProtocol(); public VertxTransporter(HttpClient httpClient) { this.httpClient = httpClient; } @Override public Protocol getProtocol() { return PROTOCOL; } @Override public void tell(URL url, PowerSerializable request) { post(url, request, null); } @Override public CompletionStage ask(URL url, PowerSerializable request, Class clz) throws RemotingException { return post(url, request, clz); } @SuppressWarnings("unchecked") private CompletionStage post(URL url, PowerSerializable request, Class clz) { final String host = url.getAddress().getHost(); final int port = url.getAddress().getPort(); final String path = url.getLocation().toPath(); RequestOptions requestOptions = new RequestOptions() .setMethod(HttpMethod.POST) .setHost(host) .setPort(port) .setURI(path); // 获取远程服务器的HTTP连接 Future httpClientRequestFuture = httpClient.request(requestOptions); // 转换 -> 发送请求获取响应 Future responseFuture = httpClientRequestFuture.compose(httpClientRequest -> httpClientRequest .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) .send(JsonObject.mapFrom(request).toBuffer()) ); return responseFuture.compose(httpClientResponse -> { // throw exception final int statusCode = httpClientResponse.statusCode(); if (statusCode != HttpResponseStatus.OK.code()) { // CompletableFuture.get() 时会传递抛出该异常 throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s", host, port, path, statusCode, httpClientResponse.statusMessage() )); } return httpClientResponse.body().compose(x -> { if (clz == null) { return Future.succeededFuture(null); } if (clz.equals(String.class)) { return Future.succeededFuture((T) x.toString()); } return Future.succeededFuture(x.toJsonObject().mapTo(clz)); }); }) .onFailure(t -> log.warn("[VertxTransporter] post to url[{}] failed,msg: {}", url, ExceptionUtils.getMessage(t))) .toCompletionStage(); } }