shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package com.walker.tcp.netty;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.tcp.Constants;
import com.walker.tcp.Response;
import com.walker.tcp.connect.LongConnection;
import com.walker.tcp.support.AbstractTcpLongEngine;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
 
import java.util.concurrent.TimeUnit;
 
public class DefaultLongEngine extends AbstractTcpLongEngine {
 
    private ListeningClientThread serverStartThread = null;
 
    private EventLoopGroup bossGroup = null;
    private EventLoopGroup workerGroup = null;
 
    private int bossThreadNum = 2;
 
    private int workerThreadNum = 4;
 
//     消息分隔符
//    private String packageSeparator = "#";
//
//    public String getPackageSeparator() {
//        return packageSeparator;
//    }
//
//    /**
//     * 设置消息分隔符,默认:#
//     * @param packageSeparator
//     */
//    public void setPackageSeparator(String packageSeparator) {
//        this.packageSeparator = packageSeparator;
//    }
 
    public void setBossThreadNum(int bossThreadNum) {
        this.bossThreadNum = bossThreadNum;
    }
 
    public void setWorkerGroup(EventLoopGroup workerGroup) {
        this.workerGroup = workerGroup;
    }
 
    public void setWorkerThreadNum(int workerThreadNum) {
        this.workerThreadNum = workerThreadNum;
    }
 
    private DefaultServerInitializer serverInitializer;
 
    /**
     * 支持多个分隔符,用英文逗号分隔
     * @param serverInitializer
     */
    public void setServerInitializer(DefaultServerInitializer serverInitializer) {
        this.serverInitializer = serverInitializer;
//        this.serverInitializer.setPackageSeparator(packageSeparator);
    }
 
    @Override
    protected void onStart() throws Exception {
        if(serverInitializer == null){
            throw new IllegalArgumentException("缺少:DefaultServerInitializer参数");
        }
 
 
//        bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true));
//        workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true));
//
//         ServerBootstrap b = new ServerBootstrap();
//         b.group(bossGroup, workerGroup);
//         b.channel(NioServerSocketChannel.class);
//         b.childHandler(serverInitializer);
//
//         logger.info("***************************************");
//         logger.info("长连接引擎启动,端口:" + getPort());
//         logger.info("名称:" + getName());
//         logger.info("***************************************");
//
//         try{
//             // 服务器绑定端口监听
//             ChannelFuture f = b.bind(getPort()).sync();
//
//             // 监听服务器关闭监听
//             f.channel().closeFuture().sync();
//
//         } catch(Exception ex){
//             logger.error(null, ex);
//             bossGroup.shutdownGracefully();
//             workerGroup.shutdownGracefully();
//             throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName());
//         }
 
        serverStartThread = new ListeningClientThread();
        serverStartThread.start();
    }
 
    @Override
    protected void onShutdown() throws Exception {
        if(bossGroup != null){
            bossGroup.shutdownGracefully();
        }
        if(workerGroup != null){
            workerGroup.shutdownGracefully();
        }
        if(serverStartThread != null){
            serverStartThread.interrupt();
        }
        logger.info("***************************************");
        logger.info("长连接引擎被关闭:" + this.getName() + ", port = " + this.getPort());
        logger.info("***************************************");
    }
 
    private static final String ERROR_NO_NAME = "缺少通道或设备唯一id,无法推送消息";
    private static final String ERROR_NO_CONNECTION = "未找到连接,无法推送tcp数据";
 
    @Override
    protected String onSendResponse(Response<?> response) {
        //获取业务定义的终端ID
        String name = response.getName();
        if(StringUtils.isEmpty(name)){
//            throw new IllegalArgumentException("缺少通道或设备唯一id,无法推送消息");
            return ERROR_NO_NAME;
        }
        LongConnection conn = (LongConnection)this.getConnectionManager().getConnectionByName(name);
        if(conn == null){
            logger.warn("未找到连接,无法推送tcp数据, name=" + name + ", content=");
            return ERROR_NO_CONNECTION;
        }
        conn.write(response);
        logger.debug(conn.getName() + ", " + conn.getConnectionHost() + ", " + conn.getClass().getName());
 
        // 2023-08-24,如果是业务调用,更新连接最近时间
        // 如果后续还要加TCP心跳,则需要在这里追加判断,TCP心跳为内容:0#
        if(!response.getProtocolNum().equals(Constants.PROTOCOL_HEART_BEAT)){
            conn.setLastTime(System.currentTimeMillis());
        }
        // 2023-08-25,连接后执行回调函数
        if(response.getProtocolNum().equals(Constants.PROTOCOL_LOGIN)){
            if(this.connectionCallback != null){
                this.connectionCallback.onAfterLogin(conn);
            }
        }
 
        if(logger.isDebugEnabled()){
            logger.debug("发送一条消息到客户端:" + response);
        }
        return null;
    }
 
    private class ListeningClientThread extends Thread{
        public ListeningClientThread(){
            setDaemon(true);
        }
 
        public void run(){
            if(getEngineStartDelaySeconds() > 0){
                try {
                    logger.info("ListeningClientThread 延时启动(秒)={}", getEngineStartDelaySeconds());
                    TimeUnit.SECONDS.sleep(getEngineStartDelaySeconds());
                } catch (InterruptedException e) {
                    throw new RuntimeException("监听tcp客户端线程延时启动,被中断:" + e.getMessage(), e);
                }
            }
 
            bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true));
            workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true));
 
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup);
 
             // 2023-10-14 暂时还原通道ID生成策略,自定义后发现通道id不会保持,目前还在找原因。
             b.channel(NioServerSocketChannel.class);
            // 2023-09-29 使用自定义通道ID生成,支持集群环境。
//             b.channel(GenIdNioSocketChannel.class);
            // 2023-09-30 测试发现,通过设置该对象能自定义通道ID
//             b.channelFactory(new GenIdChannelFactory());
 
             b.childHandler(serverInitializer);
 
             logger.info("***************************************");
             logger.info("长连接引擎启动,端口:" + getPort());
             logger.info("名称:" + DefaultLongEngine.this.getName());
             logger.info("***************************************");
 
             try{
                 // 服务器绑定端口监听
                 ChannelFuture f = b.bind(getPort()).sync();
 
                 // 监听服务器关闭监听
                 f.channel().closeFuture().sync();
 
             } catch(Exception ex){
                 logger.error(null, ex);
                 bossGroup.shutdownGracefully();
                 workerGroup.shutdownGracefully();
                 throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName());
             }
        }
    }
 
//    @Override
//    protected Response<?> getServerHeartBeatResponse() {
//        return heartBeatResponse;
//    }
 
}