| | |
| | | public class WebSocketNettyHandler extends DefaultLongHandler { |
| | | |
| | | private WebSocketServerHandshaker handshaker; |
| | | |
| | | |
| | | private String uri; |
| | | |
| | | |
| | | public String getUri() { |
| | | return uri; |
| | | } |
| | | |
| | | /** |
| | | * 设置外部配置的websocket访问地址。 |
| | | * <pre> |
| | | * 1) 该地址是浏览器可访问的最终地址,如:ws://my.com/websocket 或 wss://my.com/wss |
| | | * 2) 引擎本身并不会生成地址,代码中log日志中websocket仅是调试并非真正路径 |
| | | * 3) 引擎仅监听设置的端口(默认60000),通常在 nginx 配置中,代理该端口即可,如: |
| | | * |
| | | * ... |
| | | * location /wss { |
| | | * proxy_pass http://my.com:60000; |
| | | * proxy_http_version 1.1; |
| | | * proxy_set_header Upgrade $http_upgrade; |
| | | * proxy_set_header Connection "Upgrade"; |
| | | * } |
| | | * |
| | | * </pre> |
| | | * @param uri |
| | | */ |
| | | public void setUri(String uri) { |
| | | this.uri = uri; |
| | | } |
| | | |
| | | @Override |
| | | protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { |
| | | |
| | | |
| | | String textMessage = null; |
| | | |
| | | |
| | | if(msg instanceof FullHttpRequest){ |
| | | //-----------> http://xxxx |
| | | handleHttpRequest(ctx,(FullHttpRequest)msg); |
| | | textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG; |
| | | msgThreadLocal.set(textMessage); |
| | | |
| | | |
| | | }else if(msg instanceof WebSocketFrame){ |
| | | //-----------> ws://xxxx |
| | | //-----------> ws://xxxx |
| | | textMessage = handlerWebSocketFrame(ctx,(WebSocketFrame)msg); |
| | | String id = ctx.channel().id().asLongText() ; |
| | | LongConnection conn = (LongConnection)connectionManager.getConnection(id); |
| | | |
| | | |
| | | msgThreadLocal.set(textMessage); |
| | | tcpServerHandler.onRead(conn, textMessage); |
| | | |
| | | |
| | | } else { |
| | | throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName()); |
| | | } |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
| | | // System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() ); |
| | | |
| | | |
| | | String id = ctx.channel().id().asLongText() ; |
| | | // String msg = msgThreadLocal.get(); |
| | | Object msg = msgThreadLocal.get(); |
| | | if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){ |
| | | return; |
| | | } |
| | | |
| | | |
| | | tcpServerHandler.onReadComplete(msg, id, ctx); |
| | | msgThreadLocal.set(null); |
| | | super.channelReadComplete(ctx); |
| | | } |
| | | |
| | | |
| | | private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { |
| | | // 判断是否关闭链路的指令 |
| | | if (frame instanceof CloseWebSocketFrame) { |
| | |
| | | handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); |
| | | return null; |
| | | } |
| | | |
| | | |
| | | // 判断是否ping消息 |
| | | if (frame instanceof PingWebSocketFrame) { |
| | | ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); |
| | |
| | | //ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); |
| | | return null; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | // 本例程仅支持文本消息,不支持二进制消息 |
| | | if (!(frame instanceof TextWebSocketFrame)) { |
| | | //System.out.println("本例程仅支持文本消息,不支持二进制消息"); |
| | | throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); |
| | | } |
| | | |
| | | |
| | | // 返回应答消息 |
| | | return ((TextWebSocketFrame) frame).text(); |
| | | //System.out.println("服务端收到:" + request); |
| | |
| | | // 返回【谁发的发给谁】 |
| | | // ctx.channel().writeAndFlush(tws); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 第一次请求是http请求,请求头包括ws的信息 |
| | | * @param ctx |
| | |
| | | sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); |
| | | return; |
| | | } |
| | | |
| | | |
| | | logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket"); |
| | | // WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false); |
| | | WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false); |
| | | handshaker = wsFactory.newHandshaker(req); |
| | | |
| | | |
| | | if(handshaker == null){ |
| | | //不支持 |
| | | WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); |
| | |
| | | handshaker.handshake(ctx.channel(), req); |
| | | } |
| | | } |
| | | |
| | | |
| | | private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){ |
| | | // 返回应答给客户端 |
| | | if (res.status().code() != 200) |