package com.walker.tcp.netty;
|
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.ServerHandler;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleStateEvent;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
/**
|
* 把之前<code>DefaultLongHandler</code>里面的代码抽象出来了,因为泛型参数受到影响,所以把不涉及类型的代码聚在一起。
|
* @author 时克英
|
* @date 2018-09-19
|
*
|
* @param <T>
|
*/
|
public abstract class AbstractChannelInBoundHandler<T> extends SimpleChannelInboundHandler<T> {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
|
|
protected ServerHandler<T> tcpServerHandler;
|
|
protected ConnectionManager connectionManager;
|
|
// 存储线程中接受到的消息,在
|
protected final ThreadLocal<T> msgThreadLocal = new ThreadLocal<>();
|
|
public void setConnectionManager(ConnectionManager connectionManager) {
|
this.connectionManager = connectionManager;
|
}
|
|
public void setTcpServerHandler(ServerHandler<T> tcpServerHandler) {
|
this.tcpServerHandler = tcpServerHandler;
|
}
|
|
/*
|
*
|
* 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
|
*
|
* channelActive 和 channelInActive 在后面的内容中讲述,这里先不做详细的描述
|
* */
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
// System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
|
// ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
|
// System.out.println("通道id = " + ctx.channel().id().asLongText() + ", " + ctx.channel().id().asShortText());
|
//
|
// String id = ctx.channel().id().asLongText() ;
|
// ChannelHandlerContext cacheClient = ConnectionHolder.get(id);
|
// if(cacheClient == null){
|
// ConnectionHolder.put(id, ctx);
|
// }
|
logger.debug("ChannelHandlerContext = {}", ctx.getClass().getName());
|
|
String id = ctx.channel().id().asLongText() ;
|
DefaultLongConnection conn = (DefaultLongConnection)connectionManager.getConnection(id);
|
|
if(conn != null){
|
if(conn.isAuthenticated() && conn.getName() != null){
|
conn.setChannelHandlerContext(ctx);
|
conn.setLastTime(System.currentTimeMillis());
|
connectionManager.updateConnection(id, conn);
|
logger.debug("channelActive,连接已经存在,更新时间:" + conn.getName());
|
return;
|
} else {
|
logger.debug("连接已经存在,但已经过期,删除后会重新缓存。id = " + id);
|
connectionManager.removeConnection(id);
|
}
|
}
|
|
// conn = new DefaultLongConnection(id, ctx);
|
// connectionManager.putConnection(conn);
|
|
tcpServerHandler.onConnected(id);
|
super.channelActive(ctx);
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
// System.out.println("执行:channelInactive," + ctx.channel().remoteAddress() + " inactive.");
|
// String id = ctx.channel().id().asLongText() ;
|
// ConnectionHolder.remove(id);
|
|
String id = ctx.channel().id().asLongText() ;
|
tcpServerHandler.onDisConnected(id);
|
super.channelInactive(ctx);
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
|
throws Exception {
|
// System.out.println("出现异常:" + cause.getMessage());
|
tcpServerHandler.onException(cause);
|
super.exceptionCaught(ctx, cause);
|
}
|
|
@Override
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
String id = ctx.channel().id().asLongText() ;
|
logger.debug("执行了方法:handlerRemoved(),连接被删除。id = " + id);
|
tcpServerHandler.onDisConnected(id);
|
super.channelInactive(ctx);
|
}
|
|
/**
|
* 一段时间未进行读写操作 回调
|
*/
|
@Override
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
|
super.userEventTriggered(ctx, evt);
|
|
if (evt instanceof IdleStateEvent) {
|
IdleStateEvent event = (IdleStateEvent) evt;
|
if (event.state().equals(IdleState.READER_IDLE)) {
|
String id = ctx.channel().id().asLongText() ;
|
// ctx.channel().
|
try {
|
tcpServerHandler.onDisConnected(id);
|
logger.warn("通道超时未请求数据,被强制关闭,id:" + id);
|
|
} catch (Exception e) {
|
logger.error("通道超时未请求数据,被强制关闭,但发生异常:" + e.getMessage(), e);
|
}
|
|
}else if (event.state().equals(IdleState.WRITER_IDLE)) {
|
|
|
} else if (event.state().equals(IdleState.ALL_IDLE)) {
|
//未进行读写
|
logger.debug("------ALL_IDLE");
|
// 发送心跳消息
|
// MsgHandleService.getInstance().sendMsgUtil.sendHeartMessage(ctx);
|
}
|
}
|
}
|
|
}
|