shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package com.walker.tcp.handler;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.queue.QueueManager;
import com.walker.tcp.AuthenticateException;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.ProtocolResolver;
import com.walker.tcp.ProtocolResolverPostProcessor;
import com.walker.tcp.Request;
import com.walker.tcp.ServerHandler;
import com.walker.tcp.connect.LongConnection;
import com.walker.tcp.data.AbstractStringRequest;
import com.walker.tcp.netty.DefaultLongConnection;
import com.walker.tcp.util.ConvertorUtils;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
//public abstract class AbstractStringHandler implements ServerHandler<String> {
public abstract class AbstractStringHandler implements ServerHandler<Object> {
 
    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
    protected static final String MSG_REQUEST_ERROR = "未找到任何使用 TcpRequest 注解的请求对象,无法继续执行消息接收";
 
    private int engineId;
 
    private ConnectionManager connectionManager;
 
//    private DataConvertable convertor;
//    private Authenticate authenticate;
 
    private boolean emtpyMsgDisconnect = false;
 
    private List<ProtocolResolver<?>> protocolResolverList;
 
    private Map<String, String> mapper = null;
 
    private QueueManager queueManager;
 
    private String connectionHost;
 
    /**
     * 获取该连接对象绑定的主机信息,在集群环境中,需要标记连接对应主机。
     * <p>目前在聊天模块中,多主机部署通过消息方式推送,主机信息是队列的一部分,通过对方连接的主机知道消息如何被路由接收。</p>
     * @return
     * @date 2023-09-19
     */
    public String getConnectionHost() {
        return connectionHost;
    }
 
    /**
     * 设置该连接对象绑定的主机信息,在集群环境中,需要标记连接对应主机。
     * <p>目前在聊天模块中,多主机部署通过消息方式推送,主机信息是队列的一部分,通过对方连接的主机知道消息如何被路由接收。</p>
     * @param connectionHost
     * @date 2023-09-19
     */
    public void setConnectionHost(String connectionHost) {
        this.connectionHost = connectionHost;
    }
 
 
    public QueueManager getQueueManager() {
        return queueManager;
    }
 
    public void setQueueManager(QueueManager queueManager) {
        this.queueManager = queueManager;
    }
 
    /**
     * 返回请求协议与请求对象之间的关系Map。如示例:</p>
     * <pre>
     * AP00 --> com.walker.tcp.littleD.LoginRequest
     * </pre>
     * @return
     */
    public Map<String, String> getMapper() {
        return mapper;
    }
 
    /**
     * 设置tcpRequest注解的扫描包路劲,多个以逗号分隔
     * @param scanPackages
     */
    public void setScanPackages(String scanPackages) {
        if(StringUtils.isNotEmpty(scanPackages)){
            String[] packageNames = scanPackages.split(StringUtils.DEFAULT_SPLIT_SEPARATOR);
            if(packageNames != null){
                if(mapper == null) mapper = new HashMap<>();
                Map<String, String> map = null;
                for(String p : packageNames){
                    map = ConvertorUtils.scanTcpRequestAnnotation(p);
                    if(map != null){
                        mapper.putAll(map);
                    }
                }
            }
        }
    }
 
    protected List<ProtocolResolver<?>> getProtocolResolverList() {
        return protocolResolverList;
    }
 
    public void setProtocolResolverList(List<ProtocolResolver<?>> protocolResolverList) {
        this.protocolResolverList = protocolResolverList;
    }
 
//    @Deprecated
//    public void setAuthenticate(Authenticate authenticate) {
//        this.authenticate = authenticate;
//    }
//    public void setConvertor(DataConvertable convertor) {
//        this.convertor = convertor;
//    }
 
    @Override
    public void onConnected(String id) throws Exception {
        // 该方法因为耦合有netty代码,所以放在netty实现类中:DefaultLongHandler
    }
 
    @Override
    public void onDisConnected(String id) throws Exception {
        connectionManager.removeConnection(id);
    }
 
    @Override
    public void onRead(Connection conn, Object msg) throws Exception {
 
    }
 
    @Override
    public void onReadComplete(Object msg, String id, ChannelHandlerContext ctx) throws Exception {
//        if(StringUtils.isEmpty(msg)){
        if(msg == null || StringUtils.isEmpty(msg.toString())){
//            throw new IllegalArgumentException("msg is required!");
            // 有些客户端强制断开连接时,msg会出现空的情况,所以这里不能抛异常。
            if(this.emtpyMsgDisconnect){
                Connection conn = connectionManager.getConnection(id);
                if(conn != null){
                    conn.disconnect();
                }
                connectionManager.removeConnection(id);
                logger.warn("服务端接收到客户端空数据,可能终端异常断开,服务器也将强制断开。id = " + id);
            }
            return;
        }
 
        if(this.logger.isDebugEnabled()){
            logger.debug(msg.toString());
        }
 
        /**
         * 1.判断该连接是否已经被认证,如果未认证先处理认证
         * 2.缓存新连接对象
         * 3.已认证请求,直接转换请求对象,调用业务回调
         */
 
        Request<?> request = null;
        ProtocolResolver<?> resolver = null;
 
        try{
            request = this.createRequest(msg.toString());
 
        } catch(Exception ex){
            logger.error("创建tcp request失败:" + msg, ex);
            return;
        }
 
        if(request == null){
//            throw new IllegalArgumentException("业务未定义请求协议对象:" + msg);
            logger.warn("消息丢弃,业务未定义请求协议对象:" + msg);
            return;
        }
        resolver = ProtocolResolverPostProcessor.getProtocolResolver(request.getProtocolResolverId());
 
        Connection conn = connectionManager.getConnection(id);
        if(conn == null){
//            throw new IllegalStateException("cached conn lost! channelId = " + id);
            conn = new DefaultLongConnection(id, ctx);
 
            conn.setProtocolResolver(resolver);
            ((LongConnection)conn).setEngineId(this.engineId);
            ((LongConnection)conn).setLongConnection(true);
            // 2023-09-19 设置连接关联主机信息,集群环境中用于区别连接对象是在哪个节点上创建的。
            conn.setConnectionHost(this.connectionHost);
        }
 
//        if(conn.supportLongConnection() && !conn.isAuthenticated()){
        if(conn.supportLongConnection() && conn.getAlreadyLogin() == 0){
            String clientId;
            try{
//                clientId = this.authenticate.authenticate(request);
                clientId = resolver.getAuthenticateInfo(request);
                // 认证后创建连接
                conn.bindName(clientId);
                logger.debug("connection更新认证:" + conn.getName());
                connectionManager.putConnection(conn);
 
            } catch(AuthenticateException ae){
                logger.error("该通道未认证成功,已经被断开连接:" + id, ae);
                logger.error("connection = {}", conn);
                conn.disconnect();
                connectionManager.removeConnection(id);
                return;
            }
        } else {
            // 已经认证过的通道,把终端ID设置到request中
//            ((AbstractStringRequest)request).setName(conn.getName());
        }
 
        // 把设备编号,连接session放到请求中使用
        AbstractStringRequest req = (AbstractStringRequest)request;
        req.setName(conn.getName());
        req.setSessionId(conn.getId());
 
        /* 注意:如果使用内存队列,在队列已满的情况下,这里会阻塞 */
        queueManager.push(request.getProtocolNum(), request, null);
 
        /**
         * 用消息的方式代替action业务执行!
         *
//        ActionCallable action = this.convertor.getSingletonAction(request.getProtocolNum());
        ActionCallable action = this.createAction(request);
        try{
            if(request.isRequireResponse()){
                Response<?> response = action.action(request);
 
                // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
                response.setName(request.getName());
 
                conn.write(response);
            } else {
                action.action(request);
                logger.debug("该请求不需要响应:" + request);
            }
 
        } catch(ActionCallException ex){
            // 这里也可以加入提醒机制,待续
            logger.error("执行action错误:" + ex.getMessage(), ex);
        }
        */
    }
 
    protected abstract Request<?> createRequest(String message) throws Exception;
 
//    @Deprecated
//    protected abstract ActionCallable createAction(Request<?> request);
 
    @Override
    public void onException(Throwable cause) throws Exception {
        if(cause instanceof IOException){
            logger.warn("客户端连接异常,可能已经断开");
        } else {
            logger.error(null, cause);
        }
    }
 
    @Override
    public int getEngineId() {
        return engineId;
    }
 
    public void setEngineId(int id){
        this.engineId = id;
    }
 
    @Override
    public ConnectionManager getConnectionManager() {
        return connectionManager;
    }
 
    public void setConnectionManager(ConnectionManager manager){
        this.connectionManager = manager;
    }
 
    @Override
    public void setEmptyMsgDisconnect(boolean result) {
        this.emtpyMsgDisconnect = result;
    }
 
    public boolean isEmtpyMsgDisconnect() {
        return emtpyMsgDisconnect;
    }
}