shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.walker.tcp.netty;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.tcp.connect.LongConnection;
import com.walker.tcp.util.WebSocketUtils;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
 
public class WebSocketNettyHandler extends DefaultLongHandler {
 
    private WebSocketServerHandshaker handshaker;
    
    private String uri;
    
     public String getUri() {
        return uri;
    }
 
    public void setUri(String uri) {
        this.uri = uri;
    }
 
    @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
         
         String textMessage = null;
         
        if(msg instanceof FullHttpRequest){
            //-----------> http://xxxx
            handleHttpRequest(ctx,(FullHttpRequest)msg);
            textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG;
            msgThreadLocal.set(textMessage);
            
        }else if(msg instanceof WebSocketFrame){
            //-----------> ws://xxxx    
            textMessage =  handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
            String id = ctx.channel().id().asLongText() ;
            LongConnection conn = (LongConnection)connectionManager.getConnection(id);
            
            msgThreadLocal.set(textMessage);
            tcpServerHandler.onRead(conn, textMessage);
            
        } else {
            throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName());
        }
      }
     
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //        System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() );
            
            String id = ctx.channel().id().asLongText() ;
//            String msg = msgThreadLocal.get();
            Object msg = msgThreadLocal.get();
            if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){
                return;
            }
            
            tcpServerHandler.onReadComplete(msg, id, ctx);
            msgThreadLocal.set(null);
         super.channelReadComplete(ctx);
    }
    
    private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            System.out.println(1);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return null;
        }
        
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return null;
        }
        if (frame instanceof PongWebSocketFrame) {
            //ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return null;
        }
        
        
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            //System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }
        
        // 返回应答消息
        return ((TextWebSocketFrame) frame).text();
        //System.out.println("服务端收到:" + request);
 
//        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
        // 群发
//        Global.group.writeAndFlush(tws);
        // 返回【谁发的发给谁】
        // ctx.channel().writeAndFlush(tws);
        }
    
    /**
     * 第一次请求是http请求,请求头包括ws的信息
     * @param ctx
     * @param req
     */
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        if(StringUtils.isEmpty(uri)){
            throw new IllegalArgumentException("WebSocketHandler: websocket uri 未设置");
        }
        if(!req.decoderResult().isSuccess()){
            sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        
        logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket");
//        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false);
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false);
        handshaker = wsFactory.newHandshaker(req);
        
        if(handshaker == null){
            //不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            handshaker.handshake(ctx.channel(), req);
        }
    }
    
    private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
        // 返回应答给客户端
        if (res.status().code() != 200)
        {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
 
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200){
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
 
}