package com.iplatform.tcp.config; import com.iplatform.base.config.TcpProperties; import com.iplatform.core.PlatformConfiguration; import com.iplatform.core.config.LoadBalanceProperties; import com.iplatform.tcp.EngineType; import com.iplatform.tcp.EquipmentCacheProvider; import com.iplatform.tcp.EquipmentStatusCacheProvider; import com.iplatform.tcp.lb.DefaultLbConnectionManager; import com.iplatform.tcp.service.TcpEquipStatusServiceImpl; import com.iplatform.tcp.support.PersistentConnectionManager; import com.iplatform.tcp.support.PlatformSharpProtocolResolver; import com.walker.infrastructure.ApplicationRuntimeException; import com.walker.push.rocketmq.RocketQueueManager; import com.walker.queue.QueueManager; import com.walker.tcp.ConnectionManager; import com.walker.tcp.ProtocolResolver; import com.walker.tcp.handler.LongHandler; import com.walker.tcp.lb.RedisConnectionMetaCache; import com.walker.tcp.lb.RedisConnectionNameCache; import com.walker.tcp.lb.ResponseWriter; import com.walker.tcp.netty.DefaultLongEngine; import com.walker.tcp.netty.DefaultLongHandler; import com.walker.tcp.netty.DefaultServerInitializer; import com.walker.tcp.protocol.LineProtocolResolver; import com.walker.tcp.support.MemoryQueueManager; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Configuration @ConditionalOnProperty(prefix = "iplatform.tcp", name = "enabled", havingValue = "true", matchIfMissing = false) public class TcpConfig extends PlatformConfiguration { // 迁移到base中 // @Bean // public TcpProperties tcpProperties(){ // return new TcpProperties(); // } /** * 配置设备连接管理器。 * @param equipmentCacheProvider * @param equipmentStatusCacheProvider * @param tcpEquipStatusService * @return * @date 2023-04-16 * @date 2023-07-19 添加条件限制,如果设置自定义连接管理器,则平台不再创建,由业务自己配置。 * @date 2023-09-26 添加长连接元数据缓存配置,仅在集群模式可用。 */ @ConditionalOnProperty(prefix = "iplatform.tcp", name = "custom-connection-manager", havingValue = "false", matchIfMissing = false) @Bean public ConnectionManager connectionManager(EquipmentCacheProvider equipmentCacheProvider , EquipmentStatusCacheProvider equipmentStatusCacheProvider, TcpEquipStatusServiceImpl tcpEquipStatusService , TcpProperties tcpProperties , @Nullable RedisConnectionMetaCache connectionMetaCache, @Nullable RedisConnectionNameCache connectionNameCache , @Nullable ResponseWriter responseWriter){ Map connectionTypeMap = new HashMap<>(4); connectionTypeMap.put(EngineType.INDEX_TCP_ENGINE, "TCP长连接"); connectionTypeMap.put(EngineType.INDEX_TCP_WEBSOCKET, "WebSocket连接"); if(tcpProperties.isLoadBalanceEnabled()){ DefaultLbConnectionManager connectionManager = new DefaultLbConnectionManager(); connectionManager.setMultipleEngine(true); connectionManager.setConnectionTypeMap(connectionTypeMap); // 下面这两个缓存参数,只有在Redis启用时才存在,单机模式可为空。2023-09-26 connectionManager.setConnectionMetaCache(connectionMetaCache); connectionManager.setConnectionNameCache(connectionNameCache); connectionManager.setConnectionHost(tcpProperties.getConnectionHost()); connectionManager.setResponseWriter(responseWriter); // connectionManager.setEquipCache(equipmentCacheProvider); // connectionManager.setStatusCache(equipmentStatusCacheProvider); // connectionManager.setTcpEquipStatusService(tcpEquipStatusService); logger.debug("getConnectionHost = {}", tcpProperties.getConnectionHost()); logger.info("connectionMetaCache = {}", connectionMetaCache.getClass().getName()); return connectionManager; } else { PersistentConnectionManager connectionManager = new PersistentConnectionManager(); connectionManager.setMultipleEngine(true); connectionManager.setConnectionTypeMap(connectionTypeMap); connectionManager.setEquipCache(equipmentCacheProvider); connectionManager.setStatusCache(equipmentStatusCacheProvider); connectionManager.setTcpEquipStatusService(tcpEquipStatusService); return connectionManager; } } /** * 配置协议解析器:最基本的文本换行,一行是一个消息体。 * @return * @date 2023-04-16 */ @Bean public LineProtocolResolver lineProtocolResolver(){ return new LineProtocolResolver(); } @Bean public PlatformSharpProtocolResolver platformSharpProtocolResolver(){ PlatformSharpProtocolResolver resolver = new PlatformSharpProtocolResolver(); return resolver; } /** * 配置消息队列管理器实现。 * @param connectionManager * @return */ @Bean public QueueManager queueManager(ConnectionManager connectionManager , TcpProperties tcpProperties, ThreadPoolTaskExecutor executor){ if(tcpProperties.isLoadBalanceEnabled()){ RocketQueueManager queueManager = new RocketQueueManager(); queueManager.setConnectionManager(connectionManager); queueManager.setId(1); queueManager.setName("报文消息队列管理器【集群】"); queueManager.setExecutor(executor); queueManager.startup(); return queueManager; } else { MemoryQueueManager queueManager = new MemoryQueueManager(); queueManager.setConnectionManager(connectionManager); queueManager.setId(1); queueManager.setName("报文消息队列管理器【内置】"); queueManager.setMaxWorkerThread(1); queueManager.startup(); // 必须启动 return queueManager; } } private List> acquireProtocolResolverList(){ List> protocolResolverList = new ArrayList<>(4); protocolResolverList.add(lineProtocolResolver()); protocolResolverList.add(platformSharpProtocolResolver()); return protocolResolverList; } @Bean public LongHandler tcpServerHandler(QueueManager queueManager , ConnectionManager connectionManager, TcpProperties tcpProperties){ logger.debug("...... tcp_scan_packages = {}", tcpProperties.getScanPackagesTcp()); LongHandler longHandler = new LongHandler(); longHandler.setEmptyMsgDisconnect(false); longHandler.setEngineId(EngineType.INDEX_TCP_ENGINE); // longHandler.setScanPackages("com.walker.tcp,com.iplatform.tcp"); longHandler.setScanPackages(tcpProperties.getScanPackagesTcp()); longHandler.setQueueManager(queueManager); longHandler.setConnectionManager(connectionManager); longHandler.setProtocolResolverList(this.acquireProtocolResolverList()); longHandler.setConnectionHost(tcpProperties.getConnectionHost()); return longHandler; } @Bean public DefaultLongHandler nettyLongHandler(ConnectionManager connectionManager, LongHandler tcpServerHandler){ DefaultLongHandler longHandler = new DefaultLongHandler(); longHandler.setConnectionManager(connectionManager); longHandler.setTcpServerHandler(tcpServerHandler); return longHandler; } /** * 配置:TCP 服务初始化器。 * @param nettyLongHandler * @return * @date 2023-04-16 */ @Bean public DefaultServerInitializer nettyServerInitializer(DefaultLongHandler nettyLongHandler, TcpProperties tcpProperties){ DefaultServerInitializer initializer = new DefaultServerInitializer(); initializer.setHandler(nettyLongHandler); initializer.setProtocolResolverList(this.acquireProtocolResolverList()); initializer.setShowLog(tcpProperties.isShowLog()); // initializer.setTimeOutAll(); return initializer; } /** * 配置TCP引擎。 * @param nettyServerInitializer * @param connectionManager * @return */ @Bean public DefaultLongEngine tcpLongEngine(DefaultServerInitializer nettyServerInitializer , ConnectionManager connectionManager, TcpProperties tcpProperties , LoadBalanceProperties loadBalanceProperties){ DefaultLongEngine longEngine = new DefaultLongEngine(); longEngine.setId(EngineType.INDEX_TCP_ENGINE); longEngine.setName("测试TCP长连接引擎"); longEngine.setServerInitializer(nettyServerInitializer); longEngine.setConnectionManager(connectionManager); longEngine.setPort(tcpProperties.getPortTcp()); longEngine.setBossThreadNum(tcpProperties.getBossThreadNum()); longEngine.setWorkerThreadNum(tcpProperties.getWorkerThreadNum()); longEngine.setOpenHeartBeat(tcpProperties.isOpenHeartBeat()); try { longEngine.start(); } catch (Exception e) { throw new ApplicationRuntimeException("启动'DefaultLongEngine'失败:" + e.getMessage(), e); } return longEngine; } }