shikeying
2024-02-22 0632a02beb78705dc93b760c7bfa86ecedacb970
walker-tcp/src/main/java/com/walker/tcp/netty/WebSocketNettyHandler.java
@@ -26,58 +26,76 @@
public class WebSocketNettyHandler extends DefaultLongHandler {
   private WebSocketServerHandshaker handshaker;
   private String uri;
    public String getUri() {
      return uri;
   }
   /**
    * 设置外部配置的websocket访问地址。
    * <pre>
    *     1) 该地址是浏览器可访问的最终地址,如:ws://my.com/websocket 或 wss://my.com/wss
    *     2) 引擎本身并不会生成地址,代码中log日志中websocket仅是调试并非真正路径
    *     3) 引擎仅监听设置的端口(默认60000),通常在 nginx 配置中,代理该端口即可,如:
    *
    *     ...
    *     location /wss {
    *         proxy_pass http://my.com:60000;
    *         proxy_http_version 1.1;
    *         proxy_set_header Upgrade $http_upgrade;
    *         proxy_set_header Connection "Upgrade";
    *     }
    *
    * </pre>
    * @param uri
    */
   public void setUri(String uri) {
      this.uri = uri;
   }
   @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
       String textMessage = null;
        if(msg instanceof FullHttpRequest){
           //-----------> http://xxxx
           handleHttpRequest(ctx,(FullHttpRequest)msg);
           textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG;
           msgThreadLocal.set(textMessage);
        }else if(msg instanceof WebSocketFrame){
           //-----------> ws://xxxx
           //-----------> ws://xxxx
           textMessage =  handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
           String id = ctx.channel().id().asLongText() ;
           LongConnection conn = (LongConnection)connectionManager.getConnection(id);
           msgThreadLocal.set(textMessage);
           tcpServerHandler.onRead(conn, textMessage);
        } else {
           throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName());
        }
      }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   //       System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() );
          String id = ctx.channel().id().asLongText() ;
//          String msg = msgThreadLocal.get();
          Object msg = msgThreadLocal.get();
          if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){
             return;
          }
          tcpServerHandler.onReadComplete(msg, id, ctx);
          msgThreadLocal.set(null);
         super.channelReadComplete(ctx);
    }
    private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
       // 判断是否关闭链路的指令
       if (frame instanceof CloseWebSocketFrame) {
@@ -85,7 +103,7 @@
          handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
          return null;
       }
       // 判断是否ping消息
       if (frame instanceof PingWebSocketFrame) {
          ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
@@ -95,14 +113,14 @@
          //ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
          return null;
       }
       // 本例程仅支持文本消息,不支持二进制消息
       if (!(frame instanceof TextWebSocketFrame)) {
          //System.out.println("本例程仅支持文本消息,不支持二进制消息");
          throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
       }
       // 返回应答消息
       return ((TextWebSocketFrame) frame).text();
       //System.out.println("服务端收到:" + request);
@@ -113,7 +131,7 @@
       // 返回【谁发的发给谁】
       // ctx.channel().writeAndFlush(tws);
       }
   /**
    * 第一次请求是http请求,请求头包括ws的信息
    * @param ctx
@@ -127,12 +145,12 @@
           sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
           return;
       }
       logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket");
//       WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false);
       WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false);
       handshaker = wsFactory.newHandshaker(req);
       if(handshaker == null){
           //不支持
           WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
@@ -140,7 +158,7 @@
           handshaker.handshake(ctx.channel(), req);
       }
   }
   private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
        // 返回应答给客户端
        if (res.status().code() != 200)