package com.walker.tcp.netty;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.ServerHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 把之前DefaultLongHandler
里面的代码抽象出来了,因为泛型参数受到影响,所以把不涉及类型的代码聚在一起。
* @author 时克英
* @date 2018-09-19
*
* @param
*/
public abstract class AbstractChannelInBoundHandler extends SimpleChannelInboundHandler {
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
protected ServerHandler tcpServerHandler;
protected ConnectionManager connectionManager;
// 存储线程中接受到的消息,在
protected final ThreadLocal msgThreadLocal = new ThreadLocal<>();
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
public void setTcpServerHandler(ServerHandler tcpServerHandler) {
this.tcpServerHandler = tcpServerHandler;
}
/*
*
* 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
*
* channelActive 和 channelInActive 在后面的内容中讲述,这里先不做详细的描述
* */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
// ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
// System.out.println("通道id = " + ctx.channel().id().asLongText() + ", " + ctx.channel().id().asShortText());
//
// String id = ctx.channel().id().asLongText() ;
// ChannelHandlerContext cacheClient = ConnectionHolder.get(id);
// if(cacheClient == null){
// ConnectionHolder.put(id, ctx);
// }
logger.debug("ChannelHandlerContext = {}", ctx.getClass().getName());
String id = ctx.channel().id().asLongText() ;
DefaultLongConnection conn = (DefaultLongConnection)connectionManager.getConnection(id);
if(conn != null){
if(conn.isAuthenticated() && conn.getName() != null){
conn.setChannelHandlerContext(ctx);
conn.setLastTime(System.currentTimeMillis());
connectionManager.updateConnection(id, conn);
logger.debug("channelActive,连接已经存在,更新时间:" + conn.getName());
return;
} else {
logger.debug("连接已经存在,但已经过期,删除后会重新缓存。id = " + id);
connectionManager.removeConnection(id);
}
}
// conn = new DefaultLongConnection(id, ctx);
// connectionManager.putConnection(conn);
tcpServerHandler.onConnected(id);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("执行:channelInactive," + ctx.channel().remoteAddress() + " inactive.");
// String id = ctx.channel().id().asLongText() ;
// ConnectionHolder.remove(id);
String id = ctx.channel().id().asLongText() ;
tcpServerHandler.onDisConnected(id);
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// System.out.println("出现异常:" + cause.getMessage());
tcpServerHandler.onException(cause);
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String id = ctx.channel().id().asLongText() ;
logger.debug("执行了方法:handlerRemoved(),连接被删除。id = " + id);
tcpServerHandler.onDisConnected(id);
super.channelInactive(ctx);
}
/**
* 一段时间未进行读写操作 回调
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
String id = ctx.channel().id().asLongText() ;
// ctx.channel().
try {
tcpServerHandler.onDisConnected(id);
logger.warn("通道超时未请求数据,被强制关闭,id:" + id);
} catch (Exception e) {
logger.error("通道超时未请求数据,被强制关闭,但发生异常:" + e.getMessage(), e);
}
}else if (event.state().equals(IdleState.WRITER_IDLE)) {
} else if (event.state().equals(IdleState.ALL_IDLE)) {
//未进行读写
logger.debug("------ALL_IDLE");
// 发送心跳消息
// MsgHandleService.getInstance().sendMsgUtil.sendHeartMessage(ctx);
}
}
}
}