package com.iplatform.tcp.config;
|
|
import com.iplatform.base.config.TcpProperties;
|
import com.iplatform.core.PlatformConfiguration;
|
import com.iplatform.core.config.LoadBalanceProperties;
|
import com.iplatform.tcp.EngineType;
|
import com.iplatform.tcp.EquipmentCacheProvider;
|
import com.iplatform.tcp.EquipmentStatusCacheProvider;
|
import com.iplatform.tcp.lb.DefaultLbConnectionManager;
|
import com.iplatform.tcp.service.TcpEquipStatusServiceImpl;
|
import com.iplatform.tcp.support.PersistentConnectionManager;
|
import com.iplatform.tcp.support.PlatformSharpProtocolResolver;
|
import com.walker.infrastructure.ApplicationRuntimeException;
|
import com.walker.push.rocketmq.RocketQueueManager;
|
import com.walker.queue.QueueManager;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.ProtocolResolver;
|
import com.walker.tcp.handler.LongHandler;
|
import com.walker.tcp.lb.RedisConnectionMetaCache;
|
import com.walker.tcp.lb.RedisConnectionNameCache;
|
import com.walker.tcp.lb.ResponseWriter;
|
import com.walker.tcp.netty.DefaultLongEngine;
|
import com.walker.tcp.netty.DefaultLongHandler;
|
import com.walker.tcp.netty.DefaultServerInitializer;
|
import com.walker.tcp.protocol.LineProtocolResolver;
|
import com.walker.tcp.support.MemoryQueueManager;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.lang.Nullable;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
@Configuration
|
@ConditionalOnProperty(prefix = "iplatform.tcp", name = "enabled", havingValue = "true", matchIfMissing = false)
|
public class TcpConfig extends PlatformConfiguration {
|
|
// 迁移到base中
|
// @Bean
|
// public TcpProperties tcpProperties(){
|
// return new TcpProperties();
|
// }
|
|
/**
|
* 配置设备连接管理器。
|
* @param equipmentCacheProvider
|
* @param equipmentStatusCacheProvider
|
* @param tcpEquipStatusService
|
* @return
|
* @date 2023-04-16
|
* @date 2023-07-19 添加条件限制,如果设置自定义连接管理器,则平台不再创建,由业务自己配置。
|
* @date 2023-09-26 添加长连接元数据缓存配置,仅在集群模式可用。
|
*/
|
@ConditionalOnProperty(prefix = "iplatform.tcp", name = "custom-connection-manager", havingValue = "false", matchIfMissing = false)
|
@Bean
|
public ConnectionManager connectionManager(EquipmentCacheProvider equipmentCacheProvider
|
, EquipmentStatusCacheProvider equipmentStatusCacheProvider, TcpEquipStatusServiceImpl tcpEquipStatusService
|
, TcpProperties tcpProperties
|
, @Nullable RedisConnectionMetaCache connectionMetaCache, @Nullable RedisConnectionNameCache connectionNameCache
|
, @Nullable ResponseWriter responseWriter){
|
Map<Integer, String> connectionTypeMap = new HashMap<>(4);
|
connectionTypeMap.put(EngineType.INDEX_TCP_ENGINE, "TCP长连接");
|
connectionTypeMap.put(EngineType.INDEX_TCP_WEBSOCKET, "WebSocket连接");
|
|
if(tcpProperties.isLoadBalanceEnabled()){
|
DefaultLbConnectionManager connectionManager = new DefaultLbConnectionManager();
|
connectionManager.setMultipleEngine(true);
|
connectionManager.setConnectionTypeMap(connectionTypeMap);
|
// 下面这两个缓存参数,只有在Redis启用时才存在,单机模式可为空。2023-09-26
|
connectionManager.setConnectionMetaCache(connectionMetaCache);
|
connectionManager.setConnectionNameCache(connectionNameCache);
|
connectionManager.setConnectionHost(tcpProperties.getConnectionHost());
|
connectionManager.setResponseWriter(responseWriter);
|
// connectionManager.setEquipCache(equipmentCacheProvider);
|
// connectionManager.setStatusCache(equipmentStatusCacheProvider);
|
// connectionManager.setTcpEquipStatusService(tcpEquipStatusService);
|
logger.debug("getConnectionHost = {}", tcpProperties.getConnectionHost());
|
logger.info("connectionMetaCache = {}", connectionMetaCache.getClass().getName());
|
return connectionManager;
|
|
} else {
|
PersistentConnectionManager connectionManager = new PersistentConnectionManager();
|
connectionManager.setMultipleEngine(true);
|
connectionManager.setConnectionTypeMap(connectionTypeMap);
|
connectionManager.setEquipCache(equipmentCacheProvider);
|
connectionManager.setStatusCache(equipmentStatusCacheProvider);
|
connectionManager.setTcpEquipStatusService(tcpEquipStatusService);
|
return connectionManager;
|
}
|
}
|
|
/**
|
* 配置协议解析器:最基本的文本换行,一行是一个消息体。
|
* @return
|
* @date 2023-04-16
|
*/
|
@Bean
|
public LineProtocolResolver lineProtocolResolver(){
|
return new LineProtocolResolver();
|
}
|
|
@Bean
|
public PlatformSharpProtocolResolver platformSharpProtocolResolver(){
|
PlatformSharpProtocolResolver resolver = new PlatformSharpProtocolResolver();
|
return resolver;
|
}
|
|
/**
|
* 配置消息队列管理器实现。
|
* @param connectionManager
|
* @return
|
*/
|
@Bean
|
public QueueManager queueManager(ConnectionManager connectionManager
|
, TcpProperties tcpProperties, ThreadPoolTaskExecutor executor){
|
if(tcpProperties.isLoadBalanceEnabled()){
|
RocketQueueManager queueManager = new RocketQueueManager();
|
queueManager.setConnectionManager(connectionManager);
|
queueManager.setId(1);
|
queueManager.setName("报文消息队列管理器【集群】");
|
queueManager.setExecutor(executor);
|
queueManager.startup();
|
return queueManager;
|
} else {
|
MemoryQueueManager queueManager = new MemoryQueueManager();
|
queueManager.setConnectionManager(connectionManager);
|
queueManager.setId(1);
|
queueManager.setName("报文消息队列管理器【内置】");
|
queueManager.setMaxWorkerThread(1);
|
queueManager.startup(); // 必须启动
|
return queueManager;
|
}
|
}
|
|
private List<ProtocolResolver<?>> acquireProtocolResolverList(){
|
List<ProtocolResolver<?>> protocolResolverList = new ArrayList<>(4);
|
protocolResolverList.add(lineProtocolResolver());
|
protocolResolverList.add(platformSharpProtocolResolver());
|
return protocolResolverList;
|
}
|
|
@Bean
|
public LongHandler tcpServerHandler(QueueManager queueManager
|
, ConnectionManager connectionManager, TcpProperties tcpProperties){
|
logger.debug("...... tcp_scan_packages = {}", tcpProperties.getScanPackagesTcp());
|
LongHandler longHandler = new LongHandler();
|
longHandler.setEmptyMsgDisconnect(false);
|
longHandler.setEngineId(EngineType.INDEX_TCP_ENGINE);
|
// longHandler.setScanPackages("com.walker.tcp,com.iplatform.tcp");
|
longHandler.setScanPackages(tcpProperties.getScanPackagesTcp());
|
longHandler.setQueueManager(queueManager);
|
longHandler.setConnectionManager(connectionManager);
|
longHandler.setProtocolResolverList(this.acquireProtocolResolverList());
|
longHandler.setConnectionHost(tcpProperties.getConnectionHost());
|
return longHandler;
|
}
|
|
@Bean
|
public DefaultLongHandler nettyLongHandler(ConnectionManager connectionManager, LongHandler tcpServerHandler){
|
DefaultLongHandler longHandler = new DefaultLongHandler();
|
longHandler.setConnectionManager(connectionManager);
|
longHandler.setTcpServerHandler(tcpServerHandler);
|
return longHandler;
|
}
|
|
/**
|
* 配置:TCP 服务初始化器。
|
* @param nettyLongHandler
|
* @return
|
* @date 2023-04-16
|
*/
|
@Bean
|
public DefaultServerInitializer nettyServerInitializer(DefaultLongHandler nettyLongHandler, TcpProperties tcpProperties){
|
DefaultServerInitializer initializer = new DefaultServerInitializer();
|
initializer.setHandler(nettyLongHandler);
|
initializer.setProtocolResolverList(this.acquireProtocolResolverList());
|
initializer.setShowLog(tcpProperties.isShowLog());
|
// initializer.setTimeOutAll();
|
return initializer;
|
}
|
|
/**
|
* 配置TCP引擎。
|
* @param nettyServerInitializer
|
* @param connectionManager
|
* @return
|
*/
|
@Bean
|
public DefaultLongEngine tcpLongEngine(DefaultServerInitializer nettyServerInitializer
|
, ConnectionManager connectionManager, TcpProperties tcpProperties
|
, LoadBalanceProperties loadBalanceProperties){
|
DefaultLongEngine longEngine = new DefaultLongEngine();
|
longEngine.setId(EngineType.INDEX_TCP_ENGINE);
|
longEngine.setName("测试TCP长连接引擎");
|
longEngine.setServerInitializer(nettyServerInitializer);
|
longEngine.setConnectionManager(connectionManager);
|
longEngine.setPort(tcpProperties.getPortTcp());
|
longEngine.setBossThreadNum(tcpProperties.getBossThreadNum());
|
longEngine.setWorkerThreadNum(tcpProperties.getWorkerThreadNum());
|
longEngine.setOpenHeartBeat(tcpProperties.isOpenHeartBeat());
|
try {
|
longEngine.start();
|
} catch (Exception e) {
|
throw new ApplicationRuntimeException("启动'DefaultLongEngine'失败:" + e.getMessage(), e);
|
}
|
return longEngine;
|
}
|
|
}
|