package com.walker.tcp.netty; import com.walker.tcp.ConnectionManager; import com.walker.tcp.ServerHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 把之前DefaultLongHandler里面的代码抽象出来了,因为泛型参数受到影响,所以把不涉及类型的代码聚在一起。 * @author 时克英 * @date 2018-09-19 * * @param */ public abstract class AbstractChannelInBoundHandler extends SimpleChannelInboundHandler { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); protected ServerHandler tcpServerHandler; protected ConnectionManager connectionManager; // 存储线程中接受到的消息,在 protected final ThreadLocal msgThreadLocal = new ThreadLocal<>(); public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } public void setTcpServerHandler(ServerHandler tcpServerHandler) { this.tcpServerHandler = tcpServerHandler; } /* * * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候) * * channelActive 和 channelInActive 在后面的内容中讲述,这里先不做详细的描述 * */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !"); // ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n"); // System.out.println("通道id = " + ctx.channel().id().asLongText() + ", " + ctx.channel().id().asShortText()); // // String id = ctx.channel().id().asLongText() ; // ChannelHandlerContext cacheClient = ConnectionHolder.get(id); // if(cacheClient == null){ // ConnectionHolder.put(id, ctx); // } logger.debug("ChannelHandlerContext = {}", ctx.getClass().getName()); String id = ctx.channel().id().asLongText() ; DefaultLongConnection conn = (DefaultLongConnection)connectionManager.getConnection(id); if(conn != null){ if(conn.isAuthenticated() && conn.getName() != null){ conn.setChannelHandlerContext(ctx); conn.setLastTime(System.currentTimeMillis()); connectionManager.updateConnection(id, conn); logger.debug("channelActive,连接已经存在,更新时间:" + conn.getName()); return; } else { logger.debug("连接已经存在,但已经过期,删除后会重新缓存。id = " + id); connectionManager.removeConnection(id); } } // conn = new DefaultLongConnection(id, ctx); // connectionManager.putConnection(conn); tcpServerHandler.onConnected(id); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // System.out.println("执行:channelInactive," + ctx.channel().remoteAddress() + " inactive."); // String id = ctx.channel().id().asLongText() ; // ConnectionHolder.remove(id); String id = ctx.channel().id().asLongText() ; tcpServerHandler.onDisConnected(id); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // System.out.println("出现异常:" + cause.getMessage()); tcpServerHandler.onException(cause); super.exceptionCaught(ctx, cause); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String id = ctx.channel().id().asLongText() ; logger.debug("执行了方法:handlerRemoved(),连接被删除。id = " + id); tcpServerHandler.onDisConnected(id); super.channelInactive(ctx); } /** * 一段时间未进行读写操作 回调 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { String id = ctx.channel().id().asLongText() ; // ctx.channel(). try { tcpServerHandler.onDisConnected(id); logger.warn("通道超时未请求数据,被强制关闭,id:" + id); } catch (Exception e) { logger.error("通道超时未请求数据,被强制关闭,但发生异常:" + e.getMessage(), e); } }else if (event.state().equals(IdleState.WRITER_IDLE)) { } else if (event.state().equals(IdleState.ALL_IDLE)) { //未进行读写 logger.debug("------ALL_IDLE"); // 发送心跳消息 // MsgHandleService.getInstance().sendMsgUtil.sendHeartMessage(ctx); } } } }