package com.walker.tcp.handler; import com.walker.infrastructure.utils.StringUtils; import com.walker.queue.QueueManager; import com.walker.tcp.AuthenticateException; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Message; import com.walker.tcp.ProtocolResolver; import com.walker.tcp.ProtocolResolverPostProcessor; import com.walker.tcp.Request; import com.walker.tcp.ServerHandler; import com.walker.tcp.connect.LongConnection; import com.walker.tcp.data.AbstractStringRequest; import com.walker.tcp.netty.DefaultLongConnection; import com.walker.tcp.util.ConvertorUtils; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @Deprecated public abstract class MyMessageHandler implements ServerHandler { protected final transient Log logger = LogFactory.getLog(getClass()); protected static final String MSG_REQUEST_ERROR = "未找到任何使用 TcpRequest 注解的请求对象,无法继续执行消息接收"; private int engineId; private ConnectionManager connectionManager; // private DataConvertable convertor; // private Authenticate authenticate; private boolean emtpyMsgDisconnect = false; private List> protocolResolverList; private Map mapper = null; private QueueManager queueManager; public QueueManager getQueueManager() { return queueManager; } public void setQueueManager(QueueManager queueManager) { this.queueManager = queueManager; } /** * 返回请求协议与请求对象之间的关系Map。如示例:

*
	 * AP00 --> com.walker.tcp.littleD.LoginRequest
	 * 
* @return */ public Map getMapper() { return mapper; } /** * 设置tcpRequest注解的扫描包路劲,多个以逗号分隔 * @param scanPackages */ public void setScanPackages(String scanPackages) { if(StringUtils.isNotEmpty(scanPackages)){ String[] packageNames = scanPackages.split(StringUtils.DEFAULT_SPLIT_SEPARATOR); if(packageNames != null){ if(mapper == null) mapper = new HashMap<>(); Map map = null; for(String p : packageNames){ map = ConvertorUtils.scanTcpRequestAnnotation(p); if(map != null){ mapper.putAll(map); } } } } } protected List> getProtocolResolverList() { return protocolResolverList; } public void setProtocolResolverList(List> protocolResolverList) { this.protocolResolverList = protocolResolverList; } @Override public void onConnected(String id) throws Exception { // 该方法因为耦合有netty代码,所以放在netty实现类中:DefaultLongHandler } @Override public void onDisConnected(String id) throws Exception { connectionManager.removeConnection(id); } @Override public void onRead(Connection conn, Message msg) throws Exception { } @Override public void onReadComplete(Message msg, String id, ChannelHandlerContext ctx) throws Exception { // if(StringUtils.isEmpty(msg)){ // 2020-08-18 修改 if(msg == null){ // throw new IllegalArgumentException("msg is required!"); // 有些客户端强制断开连接时,msg会出现空的情况,所以这里不能抛异常。 if(this.emtpyMsgDisconnect){ Connection conn = connectionManager.getConnection(id); if(conn != null){ conn.disconnect(); } connectionManager.removeConnection(id); logger.warn("服务端接收到客户端空数据,可能终端异常断开,服务器也将强制断开。id = " + id); } return; } logger.debug(msg); /** * 1.判断该连接是否已经被认证,如果未认证先处理认证 * 2.缓存新连接对象 * 3.已认证请求,直接转换请求对象,调用业务回调 */ Request request = null; ProtocolResolver resolver = null; try{ request = this.createRequest(msg); } catch(Exception ex){ logger.error("创建tcp request失败:" + msg, ex); return; } if(request == null){ // throw new IllegalArgumentException("业务未定义请求协议对象:" + msg); logger.warn("消息丢弃,业务未定义请求协议对象:" + msg); return; } resolver = ProtocolResolverPostProcessor.getProtocolResolver(request.getProtocolResolverId()); Connection conn = connectionManager.getConnection(id); if(conn == null){ // throw new IllegalStateException("cached conn lost! channelId = " + id); conn = new DefaultLongConnection(id, ctx); conn.setProtocolResolver(resolver); ((LongConnection)conn).setEngineId(this.engineId); } if(conn.supportLongConnection() && !conn.isAuthenticated()){ String clientId; try{ // clientId = this.authenticate.authenticate(request); clientId = resolver.getAuthenticateInfo(request); // 认证后创建连接 conn.bindName(clientId); logger.debug("connection更新认证:" + conn.getName()); connectionManager.putConnection(conn); } catch(AuthenticateException ae){ logger.error("该通道未认证成功,已经被断开连接:" + id); conn.disconnect(); connectionManager.removeConnection(id); return; } } else { // 已经认证过的通道,把终端ID设置到request中 // ((AbstractStringRequest)request).setName(conn.getName()); } // 把设备编号,连接session放到请求中使用 AbstractStringRequest req = (AbstractStringRequest)request; req.setName(conn.getName()); req.setSessionId(conn.getId()); /* 注意:如果使用内存队列,在队列已满的情况下,这里会阻塞 */ queueManager.push(request.getProtocolNum(), request, null); /** * 用消息的方式代替action业务执行! * // ActionCallable action = this.convertor.getSingletonAction(request.getProtocolNum()); ActionCallable action = this.createAction(request); try{ if(request.isRequireResponse()){ Response response = action.action(request); // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号 response.setName(request.getName()); conn.write(response); } else { action.action(request); logger.debug("该请求不需要响应:" + request); } } catch(ActionCallException ex){ // 这里也可以加入提醒机制,待续 logger.error("执行action错误:" + ex.getMessage(), ex); } */ } protected abstract Request createRequest(Message message) throws Exception; // @Deprecated // protected abstract ActionCallable createAction(Request request); @Override public void onException(Throwable cause) throws Exception { if(cause instanceof IOException){ logger.warn("客户端连接异常,可能已经断开"); } else { logger.error(null, cause); } } @Override public int getEngineId() { return engineId; } public void setEngineId(int id){ this.engineId = id; } @Override public ConnectionManager getConnectionManager() { return connectionManager; } public void setConnectionManager(ConnectionManager manager){ this.connectionManager = manager; } @Override public void setEmptyMsgDisconnect(boolean result) { this.emtpyMsgDisconnect = result; } public boolean isEmtpyMsgDisconnect() { return emtpyMsgDisconnect; } }