package com.walker.tcp.netty;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.tcp.connect.LongConnection;
|
import com.walker.tcp.util.WebSocketUtils;
|
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpUtil;
|
import io.netty.handler.codec.http.HttpVersion;
|
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
|
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
|
import io.netty.util.CharsetUtil;
|
|
public class WebSocketNettyHandler extends DefaultLongHandler {
|
|
private WebSocketServerHandshaker handshaker;
|
|
private String uri;
|
|
public String getUri() {
|
return uri;
|
}
|
|
public void setUri(String uri) {
|
this.uri = uri;
|
}
|
|
@Override
|
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
String textMessage = null;
|
|
if(msg instanceof FullHttpRequest){
|
//-----------> http://xxxx
|
handleHttpRequest(ctx,(FullHttpRequest)msg);
|
textMessage = WebSocketUtils.WEB_SOCKET_HTTP_MSG;
|
msgThreadLocal.set(textMessage);
|
|
}else if(msg instanceof WebSocketFrame){
|
//-----------> ws://xxxx
|
textMessage = handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
|
String id = ctx.channel().id().asLongText() ;
|
LongConnection conn = (LongConnection)connectionManager.getConnection(id);
|
|
msgThreadLocal.set(textMessage);
|
tcpServerHandler.onRead(conn, textMessage);
|
|
} else {
|
throw new UnsupportedOperationException("不支持该消息类型:" + msg.getClass().getName());
|
}
|
}
|
|
@Override
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
// System.out.println("服务端读取完成数据一次。" + ctx.channel().remoteAddress() );
|
|
String id = ctx.channel().id().asLongText() ;
|
// String msg = msgThreadLocal.get();
|
Object msg = msgThreadLocal.get();
|
if(msg != null && msg.toString().equals(WebSocketUtils.WEB_SOCKET_HTTP_MSG)){
|
return;
|
}
|
|
tcpServerHandler.onReadComplete(msg, id, ctx);
|
msgThreadLocal.set(null);
|
super.channelReadComplete(ctx);
|
}
|
|
private String handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
|
// 判断是否关闭链路的指令
|
if (frame instanceof CloseWebSocketFrame) {
|
System.out.println(1);
|
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
|
return null;
|
}
|
|
// 判断是否ping消息
|
if (frame instanceof PingWebSocketFrame) {
|
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
|
return null;
|
}
|
if (frame instanceof PongWebSocketFrame) {
|
//ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
|
return null;
|
}
|
|
|
// 本例程仅支持文本消息,不支持二进制消息
|
if (!(frame instanceof TextWebSocketFrame)) {
|
//System.out.println("本例程仅支持文本消息,不支持二进制消息");
|
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
|
}
|
|
// 返回应答消息
|
return ((TextWebSocketFrame) frame).text();
|
//System.out.println("服务端收到:" + request);
|
|
// TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
|
// 群发
|
// Global.group.writeAndFlush(tws);
|
// 返回【谁发的发给谁】
|
// ctx.channel().writeAndFlush(tws);
|
}
|
|
/**
|
* 第一次请求是http请求,请求头包括ws的信息
|
* @param ctx
|
* @param req
|
*/
|
public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
|
if(StringUtils.isEmpty(uri)){
|
throw new IllegalArgumentException("WebSocketHandler: websocket uri 未设置");
|
}
|
if(!req.decoderResult().isSuccess()){
|
sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
|
return;
|
}
|
|
logger.debug("================ " + "ws://"+ctx.channel()+ "/websocket");
|
// WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+ctx.channel()+ "/websocket",null,false);
|
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(uri,null,false);
|
handshaker = wsFactory.newHandshaker(req);
|
|
if(handshaker == null){
|
//不支持
|
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
|
}else{
|
handshaker.handshake(ctx.channel(), req);
|
}
|
}
|
|
private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){
|
// 返回应答给客户端
|
if (res.status().code() != 200)
|
{
|
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
|
res.content().writeBytes(buf);
|
buf.release();
|
}
|
|
// 如果是非Keep-Alive,关闭连接
|
ChannelFuture f = ctx.channel().writeAndFlush(res);
|
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200){
|
f.addListener(ChannelFutureListener.CLOSE);
|
}
|
}
|
|
}
|