shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.walker.tcp.netty;
 
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.ServerHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 * 把之前<code>DefaultLongHandler</code>里面的代码抽象出来了,因为泛型参数受到影响,所以把不涉及类型的代码聚在一起。
 * @author 时克英
 * @date 2018-09-19
 *
 * @param <T>
 */
public abstract class AbstractChannelInBoundHandler<T> extends SimpleChannelInboundHandler<T> {
 
    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
    protected ServerHandler<T> tcpServerHandler;
 
    protected ConnectionManager connectionManager;
 
    // 存储线程中接受到的消息,在
    protected final ThreadLocal<T> msgThreadLocal = new ThreadLocal<>();
 
     public void setConnectionManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }
 
    public void setTcpServerHandler(ServerHandler<T> tcpServerHandler) {
        this.tcpServerHandler = tcpServerHandler;
    }
 
    /*
     *
     * 覆盖 channelActive 方法 在channel被启用的时候触发 (在建立连接的时候)
     *
     * channelActive 和 channelInActive 在后面的内容中讲述,这里先不做详细的描述
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
 
//        System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
//        ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
//        System.out.println("通道id = " + ctx.channel().id().asLongText() + ", " + ctx.channel().id().asShortText());
//
//        String id = ctx.channel().id().asLongText() ;
//        ChannelHandlerContext cacheClient = ConnectionHolder.get(id);
//        if(cacheClient == null){
//            ConnectionHolder.put(id, ctx);
//        }
        logger.debug("ChannelHandlerContext = {}", ctx.getClass().getName());
 
           String id = ctx.channel().id().asLongText() ;
           DefaultLongConnection conn =  (DefaultLongConnection)connectionManager.getConnection(id);
 
           if(conn != null){
               if(conn.isAuthenticated() && conn.getName() != null){
                   conn.setChannelHandlerContext(ctx);
                   conn.setLastTime(System.currentTimeMillis());
                   connectionManager.updateConnection(id, conn);
                   logger.debug("channelActive,连接已经存在,更新时间:" + conn.getName());
                   return;
               } else {
                   logger.debug("连接已经存在,但已经过期,删除后会重新缓存。id = " + id);
                   connectionManager.removeConnection(id);
               }
           }
 
    //       conn = new DefaultLongConnection(id, ctx);
    //        connectionManager.putConnection(conn);
 
            tcpServerHandler.onConnected(id);
        super.channelActive(ctx);
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    //        System.out.println("执行:channelInactive," + ctx.channel().remoteAddress() + " inactive.");
    //        String id = ctx.channel().id().asLongText() ;
    //        ConnectionHolder.remove(id);
 
            String id = ctx.channel().id().asLongText() ;
            tcpServerHandler.onDisConnected(id);
        super.channelInactive(ctx);
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
    //       System.out.println("出现异常:" + cause.getMessage());
           tcpServerHandler.onException(cause);
       super.exceptionCaught(ctx, cause);
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            String id = ctx.channel().id().asLongText() ;
           logger.debug("执行了方法:handlerRemoved(),连接被删除。id = " + id);
           tcpServerHandler.onDisConnected(id);
        super.channelInactive(ctx);
    }
 
    /**
     * 一段时间未进行读写操作 回调
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        super.userEventTriggered(ctx, evt);
 
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                String id = ctx.channel().id().asLongText() ;
//                ctx.channel().
                 try {
                    tcpServerHandler.onDisConnected(id);
                    logger.warn("通道超时未请求数据,被强制关闭,id:" + id);
 
                } catch (Exception e) {
                    logger.error("通道超时未请求数据,被强制关闭,但发生异常:" + e.getMessage(), e);
                }
 
            }else if (event.state().equals(IdleState.WRITER_IDLE)) {
 
 
            } else if (event.state().equals(IdleState.ALL_IDLE)) {
                //未进行读写
                logger.debug("------ALL_IDLE");
                // 发送心跳消息
//                MsgHandleService.getInstance().sendMsgUtil.sendHeartMessage(ctx);
            }
        }
    }
 
}