package com.walker.tcp.netty; import com.walker.infrastructure.utils.StringUtils; import com.walker.tcp.connect.LongConnection; import com.walker.tcp.util.WebSocketUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; public class WebSocketNettyHandler extends DefaultLongHandler { private WebSocketServerHandshaker handshaker; private String uri; public String getUri() { return uri; } /** * 设置外部配置的websocket访问地址。 *
* 1) 该地址是浏览器可访问的最终地址,如:ws://my.com/websocket 或 wss://my.com/wss * 2) 引擎本身并不会生成地址,代码中log日志中websocket仅是调试并非真正路径 * 3) 引擎仅监听设置的端口(默认60000),通常在 nginx 配置中,代理该端口即可,如: * * ... * location /wss { * proxy_pass http://my.com:60000; * proxy_http_version 1.1; * proxy_set_header Upgrade $http_upgrade; * proxy_set_header Connection "Upgrade"; * } * ** @param uri */ public void setUri(String uri) { this.uri = uri; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { String textMessage = null; if(msg instanceof FullHttpRequest){ //-----------> http://xxxx handleHttpRequest(ctx,(FullHttpRequest)msg); textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG; msgThreadLocal.set(textMessage); }else if(msg instanceof WebSocketFrame){ //-----------> ws://xxxx textMessage = handlerWebSocketFrame(ctx,(WebSocketFrame)msg); String id = ctx.channel().id().asLongText() ; LongConnection conn = (LongConnection)connectionManager.getConnection(id); msgThreadLocal.set(textMessage); tcpServerHandler.onRead(conn, textMessage); } else { throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName()); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() ); String id = ctx.channel().id().asLongText() ; // String msg = msgThreadLocal.get(); Object msg = msgThreadLocal.get(); if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){ return; } tcpServerHandler.onReadComplete(msg, id, ctx); msgThreadLocal.set(null); super.channelReadComplete(ctx); } private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { System.out.println(1); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return null; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return null; } if (frame instanceof PongWebSocketFrame) { //ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return null; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { //System.out.println("本例程仅支持文本消息,不支持二进制消息"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 return ((TextWebSocketFrame) frame).text(); //System.out.println("服务端收到:" + request); // TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群发 // Global.group.writeAndFlush(tws); // 返回【谁发的发给谁】 // ctx.channel().writeAndFlush(tws); } /** * 第一次请求是http请求,请求头包括ws的信息 * @param ctx * @param req */ public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){ if(StringUtils.isEmpty(uri)){ throw new IllegalArgumentException("WebSocketHandler: websocket uri 未设置"); } if(!req.decoderResult().isSuccess()){ sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket"); // WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false); handshaker = wsFactory.newHandshaker(req); if(handshaker == null){ //不支持 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), req); } } private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){ // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200){ f.addListener(ChannelFutureListener.CLOSE); } } }