package com.walker.tcp.netty; import com.walker.infrastructure.utils.StringUtils; import com.walker.tcp.Constants; import com.walker.tcp.Response; import com.walker.tcp.connect.LongConnection; import com.walker.tcp.support.AbstractTcpLongEngine; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.TimeUnit; public class DefaultLongEngine extends AbstractTcpLongEngine { private ListeningClientThread serverStartThread = null; private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private int bossThreadNum = 2; private int workerThreadNum = 4; // 消息分隔符 // private String packageSeparator = "#"; // // public String getPackageSeparator() { // return packageSeparator; // } // // /** // * 设置消息分隔符,默认:# // * @param packageSeparator // */ // public void setPackageSeparator(String packageSeparator) { // this.packageSeparator = packageSeparator; // } public void setBossThreadNum(int bossThreadNum) { this.bossThreadNum = bossThreadNum; } public void setWorkerGroup(EventLoopGroup workerGroup) { this.workerGroup = workerGroup; } public void setWorkerThreadNum(int workerThreadNum) { this.workerThreadNum = workerThreadNum; } private DefaultServerInitializer serverInitializer; /** * 支持多个分隔符,用英文逗号分隔 * @param serverInitializer */ public void setServerInitializer(DefaultServerInitializer serverInitializer) { this.serverInitializer = serverInitializer; // this.serverInitializer.setPackageSeparator(packageSeparator); } @Override protected void onStart() throws Exception { if(serverInitializer == null){ throw new IllegalArgumentException("缺少:DefaultServerInitializer参数"); } // bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true)); // workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true)); // // ServerBootstrap b = new ServerBootstrap(); // b.group(bossGroup, workerGroup); // b.channel(NioServerSocketChannel.class); // b.childHandler(serverInitializer); // // logger.info("***************************************"); // logger.info("长连接引擎启动,端口:" + getPort()); // logger.info("名称:" + getName()); // logger.info("***************************************"); // // try{ // // 服务器绑定端口监听 // ChannelFuture f = b.bind(getPort()).sync(); // // // 监听服务器关闭监听 // f.channel().closeFuture().sync(); // // } catch(Exception ex){ // logger.error(null, ex); // bossGroup.shutdownGracefully(); // workerGroup.shutdownGracefully(); // throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName()); // } serverStartThread = new ListeningClientThread(); serverStartThread.start(); } @Override protected void onShutdown() throws Exception { if(bossGroup != null){ bossGroup.shutdownGracefully(); } if(workerGroup != null){ workerGroup.shutdownGracefully(); } if(serverStartThread != null){ serverStartThread.interrupt(); } logger.info("***************************************"); logger.info("长连接引擎被关闭:" + this.getName() + ", port = " + this.getPort()); logger.info("***************************************"); } private static final String ERROR_NO_NAME = "缺少通道或设备唯一id,无法推送消息"; private static final String ERROR_NO_CONNECTION = "未找到连接,无法推送tcp数据"; @Override protected String onSendResponse(Response response) { //获取业务定义的终端ID String name = response.getName(); if(StringUtils.isEmpty(name)){ // throw new IllegalArgumentException("缺少通道或设备唯一id,无法推送消息"); return ERROR_NO_NAME; } LongConnection conn = (LongConnection)this.getConnectionManager().getConnectionByName(name); if(conn == null){ logger.warn("未找到连接,无法推送tcp数据, name=" + name + ", content="); return ERROR_NO_CONNECTION; } conn.write(response); logger.debug(conn.getName() + ", " + conn.getConnectionHost() + ", " + conn.getClass().getName()); // 2023-08-24,如果是业务调用,更新连接最近时间 // 如果后续还要加TCP心跳,则需要在这里追加判断,TCP心跳为内容:0# if(!response.getProtocolNum().equals(Constants.PROTOCOL_HEART_BEAT)){ conn.setLastTime(System.currentTimeMillis()); } // 2023-08-25,连接后执行回调函数 if(response.getProtocolNum().equals(Constants.PROTOCOL_LOGIN)){ if(this.connectionCallback != null){ this.connectionCallback.onAfterLogin(conn); } } if(logger.isDebugEnabled()){ logger.debug("发送一条消息到客户端:" + response); } return null; } private class ListeningClientThread extends Thread{ public ListeningClientThread(){ setDaemon(true); } public void run(){ if(getEngineStartDelaySeconds() > 0){ try { logger.info("ListeningClientThread 延时启动(秒)={}", getEngineStartDelaySeconds()); TimeUnit.SECONDS.sleep(getEngineStartDelaySeconds()); } catch (InterruptedException e) { throw new RuntimeException("监听tcp客户端线程延时启动,被中断:" + e.getMessage(), e); } } bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true)); workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true)); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); // 2023-10-14 暂时还原通道ID生成策略,自定义后发现通道id不会保持,目前还在找原因。 b.channel(NioServerSocketChannel.class); // 2023-09-29 使用自定义通道ID生成,支持集群环境。 // b.channel(GenIdNioSocketChannel.class); // 2023-09-30 测试发现,通过设置该对象能自定义通道ID // b.channelFactory(new GenIdChannelFactory()); b.childHandler(serverInitializer); logger.info("***************************************"); logger.info("长连接引擎启动,端口:" + getPort()); logger.info("名称:" + DefaultLongEngine.this.getName()); logger.info("***************************************"); try{ // 服务器绑定端口监听 ChannelFuture f = b.bind(getPort()).sync(); // 监听服务器关闭监听 f.channel().closeFuture().sync(); } catch(Exception ex){ logger.error(null, ex); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName()); } } } // @Override // protected Response getServerHeartBeatResponse() { // return heartBeatResponse; // } }