shikeying
2024-03-26 65c1714039bfe31b748e10ca5fb7c0b78a4829e5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package com.walker.tcp.netty;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.tcp.connect.LongConnection;
import com.walker.tcp.util.WebSocketUtils;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
 
public class WebSocketNettyHandler extends DefaultLongHandler {
 
    private WebSocketServerHandshaker handshaker;
 
    private String uri;
 
     public String getUri() {
        return uri;
    }
 
    /**
     * 设置外部配置的websocket访问地址。
     * <pre>
     *     1) 该地址是浏览器可访问的最终地址,如:ws://my.com/websocket 或 wss://my.com/wss
     *     2) 引擎本身并不会生成地址,代码中log日志中websocket仅是调试并非真正路径
     *     3) 引擎仅监听设置的端口(默认60000),通常在 nginx 配置中,代理该端口即可,如:
     *
     *     ...
     *
     *     location /wss/ {
     *          proxy_pass http://localhost:60001/;
     *             proxy_http_version 1.1;
     *             proxy_set_header Upgrade $http_upgrade;
     *             proxy_set_header Connection "Upgrade";
     *             # proxy_set_header X-real-ip $remote_addr;
     *             # proxy_set_header X-Forwarded-For $remote_addr;
     *      }
     *
     * </pre>
     * @param uri
     */
    public void setUri(String uri) {
        this.uri = uri;
    }
 
    @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 
         String textMessage = null;
 
        if(msg instanceof FullHttpRequest){
            //-----------> http://xxxx
            handleHttpRequest(ctx,(FullHttpRequest)msg);
            textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG;
            msgThreadLocal.set(textMessage);
 
        }else if(msg instanceof WebSocketFrame){
            //-----------> ws://xxxx
            textMessage =  handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
            String id = ctx.channel().id().asLongText() ;
            LongConnection conn = (LongConnection)connectionManager.getConnection(id);
 
            msgThreadLocal.set(textMessage);
            tcpServerHandler.onRead(conn, textMessage);
 
        } else {
            throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName());
        }
      }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //        System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() );
 
            String id = ctx.channel().id().asLongText() ;
//            String msg = msgThreadLocal.get();
            Object msg = msgThreadLocal.get();
            if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){
                return;
            }
 
            tcpServerHandler.onReadComplete(msg, id, ctx);
            msgThreadLocal.set(null);
         super.channelReadComplete(ctx);
    }
 
    private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            System.out.println(1);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return null;
        }
 
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return null;
        }
        if (frame instanceof PongWebSocketFrame) {
            //ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return null;
        }
 
 
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            //System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }
 
        // 返回应答消息
        return ((TextWebSocketFrame) frame).text();
        //System.out.println("服务端收到:" + request);
 
//        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
        // 群发
//        Global.group.writeAndFlush(tws);
        // 返回【谁发的发给谁】
        // ctx.channel().writeAndFlush(tws);
        }
 
    /**
     * 第一次请求是http请求,请求头包括ws的信息
     * @param ctx
     * @param req
     */
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
        if(StringUtils.isEmpty(uri)){
            throw new IllegalArgumentException("WebSocketHandler: websocket uri 未设置");
        }
        if(!req.decoderResult().isSuccess()){
            sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
 
        logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket");
//        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false);
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false);
        handshaker = wsFactory.newHandshaker(req);
 
        if(handshaker == null){
            //不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            handshaker.handshake(ctx.channel(), req);
        }
    }
 
    private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
        // 返回应答给客户端
        if (res.status().code() != 200)
        {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
 
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200){
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
 
}