package com.walker.tcp.handler;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.queue.QueueManager;
import com.walker.tcp.AuthenticateException;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Message;
import com.walker.tcp.ProtocolResolver;
import com.walker.tcp.ProtocolResolverPostProcessor;
import com.walker.tcp.Request;
import com.walker.tcp.ServerHandler;
import com.walker.tcp.connect.LongConnection;
import com.walker.tcp.data.AbstractStringRequest;
import com.walker.tcp.netty.DefaultLongConnection;
import com.walker.tcp.util.ConvertorUtils;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Deprecated
public abstract class MyMessageHandler implements ServerHandler {
protected final transient Log logger = LogFactory.getLog(getClass());
protected static final String MSG_REQUEST_ERROR = "未找到任何使用 TcpRequest 注解的请求对象,无法继续执行消息接收";
private int engineId;
private ConnectionManager connectionManager;
// private DataConvertable convertor;
// private Authenticate authenticate;
private boolean emtpyMsgDisconnect = false;
private List> protocolResolverList;
private Map mapper = null;
private QueueManager queueManager;
public QueueManager getQueueManager() {
return queueManager;
}
public void setQueueManager(QueueManager queueManager) {
this.queueManager = queueManager;
}
/**
* 返回请求协议与请求对象之间的关系Map。如示例:
*
* AP00 --> com.walker.tcp.littleD.LoginRequest
*
* @return
*/
public Map getMapper() {
return mapper;
}
/**
* 设置tcpRequest注解的扫描包路劲,多个以逗号分隔
* @param scanPackages
*/
public void setScanPackages(String scanPackages) {
if(StringUtils.isNotEmpty(scanPackages)){
String[] packageNames = scanPackages.split(StringUtils.DEFAULT_SPLIT_SEPARATOR);
if(packageNames != null){
if(mapper == null) mapper = new HashMap<>();
Map map = null;
for(String p : packageNames){
map = ConvertorUtils.scanTcpRequestAnnotation(p);
if(map != null){
mapper.putAll(map);
}
}
}
}
}
protected List> getProtocolResolverList() {
return protocolResolverList;
}
public void setProtocolResolverList(List> protocolResolverList) {
this.protocolResolverList = protocolResolverList;
}
@Override
public void onConnected(String id) throws Exception {
// 该方法因为耦合有netty代码,所以放在netty实现类中:DefaultLongHandler
}
@Override
public void onDisConnected(String id) throws Exception {
connectionManager.removeConnection(id);
}
@Override
public void onRead(Connection conn, Message msg) throws Exception {
}
@Override
public void onReadComplete(Message msg, String id, ChannelHandlerContext ctx) throws Exception {
// if(StringUtils.isEmpty(msg)){
// 2020-08-18 修改
if(msg == null){
// throw new IllegalArgumentException("msg is required!");
// 有些客户端强制断开连接时,msg会出现空的情况,所以这里不能抛异常。
if(this.emtpyMsgDisconnect){
Connection conn = connectionManager.getConnection(id);
if(conn != null){
conn.disconnect();
}
connectionManager.removeConnection(id);
logger.warn("服务端接收到客户端空数据,可能终端异常断开,服务器也将强制断开。id = " + id);
}
return;
}
logger.debug(msg);
/**
* 1.判断该连接是否已经被认证,如果未认证先处理认证
* 2.缓存新连接对象
* 3.已认证请求,直接转换请求对象,调用业务回调
*/
Request> request = null;
ProtocolResolver> resolver = null;
try{
request = this.createRequest(msg);
} catch(Exception ex){
logger.error("创建tcp request失败:" + msg, ex);
return;
}
if(request == null){
// throw new IllegalArgumentException("业务未定义请求协议对象:" + msg);
logger.warn("消息丢弃,业务未定义请求协议对象:" + msg);
return;
}
resolver = ProtocolResolverPostProcessor.getProtocolResolver(request.getProtocolResolverId());
Connection conn = connectionManager.getConnection(id);
if(conn == null){
// throw new IllegalStateException("cached conn lost! channelId = " + id);
conn = new DefaultLongConnection(id, ctx);
conn.setProtocolResolver(resolver);
((LongConnection)conn).setEngineId(this.engineId);
}
if(conn.supportLongConnection() && !conn.isAuthenticated()){
String clientId;
try{
// clientId = this.authenticate.authenticate(request);
clientId = resolver.getAuthenticateInfo(request);
// 认证后创建连接
conn.bindName(clientId);
logger.debug("connection更新认证:" + conn.getName());
connectionManager.putConnection(conn);
} catch(AuthenticateException ae){
logger.error("该通道未认证成功,已经被断开连接:" + id);
conn.disconnect();
connectionManager.removeConnection(id);
return;
}
} else {
// 已经认证过的通道,把终端ID设置到request中
// ((AbstractStringRequest)request).setName(conn.getName());
}
// 把设备编号,连接session放到请求中使用
AbstractStringRequest req = (AbstractStringRequest)request;
req.setName(conn.getName());
req.setSessionId(conn.getId());
/* 注意:如果使用内存队列,在队列已满的情况下,这里会阻塞 */
queueManager.push(request.getProtocolNum(), request, null);
/**
* 用消息的方式代替action业务执行!
*
// ActionCallable action = this.convertor.getSingletonAction(request.getProtocolNum());
ActionCallable action = this.createAction(request);
try{
if(request.isRequireResponse()){
Response> response = action.action(request);
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
response.setName(request.getName());
conn.write(response);
} else {
action.action(request);
logger.debug("该请求不需要响应:" + request);
}
} catch(ActionCallException ex){
// 这里也可以加入提醒机制,待续
logger.error("执行action错误:" + ex.getMessage(), ex);
}
*/
}
protected abstract Request> createRequest(Message message) throws Exception;
// @Deprecated
// protected abstract ActionCallable createAction(Request> request);
@Override
public void onException(Throwable cause) throws Exception {
if(cause instanceof IOException){
logger.warn("客户端连接异常,可能已经断开");
} else {
logger.error(null, cause);
}
}
@Override
public int getEngineId() {
return engineId;
}
public void setEngineId(int id){
this.engineId = id;
}
@Override
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public void setConnectionManager(ConnectionManager manager){
this.connectionManager = manager;
}
@Override
public void setEmptyMsgDisconnect(boolean result) {
this.emtpyMsgDisconnect = result;
}
public boolean isEmtpyMsgDisconnect() {
return emtpyMsgDisconnect;
}
}