package com.walker.tcp.handler;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.queue.QueueManager;
|
import com.walker.tcp.AuthenticateException;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.Message;
|
import com.walker.tcp.ProtocolResolver;
|
import com.walker.tcp.ProtocolResolverPostProcessor;
|
import com.walker.tcp.Request;
|
import com.walker.tcp.ServerHandler;
|
import com.walker.tcp.connect.LongConnection;
|
import com.walker.tcp.data.AbstractStringRequest;
|
import com.walker.tcp.netty.DefaultLongConnection;
|
import com.walker.tcp.util.ConvertorUtils;
|
import io.netty.channel.ChannelHandlerContext;
|
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.LogFactory;
|
|
import java.io.IOException;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
@Deprecated
|
public abstract class MyMessageHandler implements ServerHandler<Message> {
|
|
protected final transient Log logger = LogFactory.getLog(getClass());
|
|
protected static final String MSG_REQUEST_ERROR = "未找到任何使用 TcpRequest 注解的请求对象,无法继续执行消息接收";
|
|
private int engineId;
|
|
private ConnectionManager connectionManager;
|
|
// private DataConvertable convertor;
|
|
// private Authenticate authenticate;
|
|
private boolean emtpyMsgDisconnect = false;
|
|
private List<ProtocolResolver<?>> protocolResolverList;
|
|
private Map<String, String> mapper = null;
|
|
private QueueManager queueManager;
|
|
|
public QueueManager getQueueManager() {
|
return queueManager;
|
}
|
|
public void setQueueManager(QueueManager queueManager) {
|
this.queueManager = queueManager;
|
}
|
|
/**
|
* 返回请求协议与请求对象之间的关系Map。如示例:</p>
|
* <pre>
|
* AP00 --> com.walker.tcp.littleD.LoginRequest
|
* </pre>
|
* @return
|
*/
|
public Map<String, String> getMapper() {
|
return mapper;
|
}
|
|
/**
|
* 设置tcpRequest注解的扫描包路劲,多个以逗号分隔
|
* @param scanPackages
|
*/
|
public void setScanPackages(String scanPackages) {
|
if(StringUtils.isNotEmpty(scanPackages)){
|
String[] packageNames = scanPackages.split(StringUtils.DEFAULT_SPLIT_SEPARATOR);
|
if(packageNames != null){
|
if(mapper == null) mapper = new HashMap<>();
|
Map<String, String> map = null;
|
for(String p : packageNames){
|
map = ConvertorUtils.scanTcpRequestAnnotation(p);
|
if(map != null){
|
mapper.putAll(map);
|
}
|
}
|
}
|
}
|
}
|
|
protected List<ProtocolResolver<?>> getProtocolResolverList() {
|
return protocolResolverList;
|
}
|
|
public void setProtocolResolverList(List<ProtocolResolver<?>> protocolResolverList) {
|
this.protocolResolverList = protocolResolverList;
|
}
|
|
@Override
|
public void onConnected(String id) throws Exception {
|
// 该方法因为耦合有netty代码,所以放在netty实现类中:DefaultLongHandler
|
}
|
|
@Override
|
public void onDisConnected(String id) throws Exception {
|
connectionManager.removeConnection(id);
|
}
|
|
@Override
|
public void onRead(Connection conn, Message msg) throws Exception {
|
|
}
|
|
@Override
|
public void onReadComplete(Message msg, String id, ChannelHandlerContext ctx) throws Exception {
|
|
// if(StringUtils.isEmpty(msg)){
|
// 2020-08-18 修改
|
if(msg == null){
|
// throw new IllegalArgumentException("msg is required!");
|
// 有些客户端强制断开连接时,msg会出现空的情况,所以这里不能抛异常。
|
if(this.emtpyMsgDisconnect){
|
Connection conn = connectionManager.getConnection(id);
|
if(conn != null){
|
conn.disconnect();
|
}
|
connectionManager.removeConnection(id);
|
logger.warn("服务端接收到客户端空数据,可能终端异常断开,服务器也将强制断开。id = " + id);
|
}
|
return;
|
}
|
|
logger.debug(msg);
|
|
/**
|
* 1.判断该连接是否已经被认证,如果未认证先处理认证
|
* 2.缓存新连接对象
|
* 3.已认证请求,直接转换请求对象,调用业务回调
|
*/
|
|
Request<?> request = null;
|
ProtocolResolver<?> resolver = null;
|
|
try{
|
request = this.createRequest(msg);
|
|
} catch(Exception ex){
|
logger.error("创建tcp request失败:" + msg, ex);
|
return;
|
}
|
|
if(request == null){
|
// throw new IllegalArgumentException("业务未定义请求协议对象:" + msg);
|
logger.warn("消息丢弃,业务未定义请求协议对象:" + msg);
|
return;
|
}
|
resolver = ProtocolResolverPostProcessor.getProtocolResolver(request.getProtocolResolverId());
|
|
Connection conn = connectionManager.getConnection(id);
|
if(conn == null){
|
// throw new IllegalStateException("cached conn lost! channelId = " + id);
|
conn = new DefaultLongConnection(id, ctx);
|
conn.setProtocolResolver(resolver);
|
((LongConnection)conn).setEngineId(this.engineId);
|
}
|
|
if(conn.supportLongConnection() && !conn.isAuthenticated()){
|
String clientId;
|
try{
|
// clientId = this.authenticate.authenticate(request);
|
clientId = resolver.getAuthenticateInfo(request);
|
// 认证后创建连接
|
conn.bindName(clientId);
|
logger.debug("connection更新认证:" + conn.getName());
|
connectionManager.putConnection(conn);
|
|
} catch(AuthenticateException ae){
|
logger.error("该通道未认证成功,已经被断开连接:" + id);
|
conn.disconnect();
|
connectionManager.removeConnection(id);
|
return;
|
}
|
} else {
|
// 已经认证过的通道,把终端ID设置到request中
|
// ((AbstractStringRequest)request).setName(conn.getName());
|
}
|
|
// 把设备编号,连接session放到请求中使用
|
AbstractStringRequest req = (AbstractStringRequest)request;
|
req.setName(conn.getName());
|
req.setSessionId(conn.getId());
|
|
/* 注意:如果使用内存队列,在队列已满的情况下,这里会阻塞 */
|
queueManager.push(request.getProtocolNum(), request, null);
|
|
/**
|
* 用消息的方式代替action业务执行!
|
*
|
// ActionCallable action = this.convertor.getSingletonAction(request.getProtocolNum());
|
ActionCallable action = this.createAction(request);
|
try{
|
if(request.isRequireResponse()){
|
Response<?> response = action.action(request);
|
|
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
|
response.setName(request.getName());
|
|
conn.write(response);
|
} else {
|
action.action(request);
|
logger.debug("该请求不需要响应:" + request);
|
}
|
|
} catch(ActionCallException ex){
|
// 这里也可以加入提醒机制,待续
|
logger.error("执行action错误:" + ex.getMessage(), ex);
|
}
|
*/
|
}
|
|
protected abstract Request<?> createRequest(Message message) throws Exception;
|
|
// @Deprecated
|
// protected abstract ActionCallable createAction(Request<?> request);
|
|
@Override
|
public void onException(Throwable cause) throws Exception {
|
if(cause instanceof IOException){
|
logger.warn("客户端连接异常,可能已经断开");
|
} else {
|
logger.error(null, cause);
|
}
|
}
|
|
@Override
|
public int getEngineId() {
|
return engineId;
|
}
|
|
public void setEngineId(int id){
|
this.engineId = id;
|
}
|
|
@Override
|
public ConnectionManager getConnectionManager() {
|
return connectionManager;
|
}
|
|
public void setConnectionManager(ConnectionManager manager){
|
this.connectionManager = manager;
|
}
|
|
@Override
|
public void setEmptyMsgDisconnect(boolean result) {
|
this.emtpyMsgDisconnect = result;
|
}
|
|
public boolean isEmtpyMsgDisconnect() {
|
return emtpyMsgDisconnect;
|
}
|
}
|