package com.walker.tcp.netty;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.tcp.Constants;
|
import com.walker.tcp.Response;
|
import com.walker.tcp.connect.LongConnection;
|
import com.walker.tcp.support.AbstractTcpLongEngine;
|
|
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
public class DefaultLongEngine extends AbstractTcpLongEngine {
|
|
private ListeningClientThread serverStartThread = null;
|
|
private EventLoopGroup bossGroup = null;
|
private EventLoopGroup workerGroup = null;
|
|
private int bossThreadNum = 2;
|
|
private int workerThreadNum = 4;
|
|
// 消息分隔符
|
// private String packageSeparator = "#";
|
//
|
// public String getPackageSeparator() {
|
// return packageSeparator;
|
// }
|
//
|
// /**
|
// * 设置消息分隔符,默认:#
|
// * @param packageSeparator
|
// */
|
// public void setPackageSeparator(String packageSeparator) {
|
// this.packageSeparator = packageSeparator;
|
// }
|
|
public void setBossThreadNum(int bossThreadNum) {
|
this.bossThreadNum = bossThreadNum;
|
}
|
|
public void setWorkerGroup(EventLoopGroup workerGroup) {
|
this.workerGroup = workerGroup;
|
}
|
|
public void setWorkerThreadNum(int workerThreadNum) {
|
this.workerThreadNum = workerThreadNum;
|
}
|
|
private DefaultServerInitializer serverInitializer;
|
|
/**
|
* 支持多个分隔符,用英文逗号分隔
|
* @param serverInitializer
|
*/
|
public void setServerInitializer(DefaultServerInitializer serverInitializer) {
|
this.serverInitializer = serverInitializer;
|
// this.serverInitializer.setPackageSeparator(packageSeparator);
|
}
|
|
@Override
|
protected void onStart() throws Exception {
|
if(serverInitializer == null){
|
throw new IllegalArgumentException("缺少:DefaultServerInitializer参数");
|
}
|
|
|
// bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true));
|
// workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true));
|
//
|
// ServerBootstrap b = new ServerBootstrap();
|
// b.group(bossGroup, workerGroup);
|
// b.channel(NioServerSocketChannel.class);
|
// b.childHandler(serverInitializer);
|
//
|
// logger.info("***************************************");
|
// logger.info("长连接引擎启动,端口:" + getPort());
|
// logger.info("名称:" + getName());
|
// logger.info("***************************************");
|
//
|
// try{
|
// // 服务器绑定端口监听
|
// ChannelFuture f = b.bind(getPort()).sync();
|
//
|
// // 监听服务器关闭监听
|
// f.channel().closeFuture().sync();
|
//
|
// } catch(Exception ex){
|
// logger.error(null, ex);
|
// bossGroup.shutdownGracefully();
|
// workerGroup.shutdownGracefully();
|
// throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName());
|
// }
|
|
serverStartThread = new ListeningClientThread();
|
serverStartThread.start();
|
}
|
|
@Override
|
protected void onShutdown() throws Exception {
|
if(bossGroup != null){
|
bossGroup.shutdownGracefully();
|
}
|
if(workerGroup != null){
|
workerGroup.shutdownGracefully();
|
}
|
if(serverStartThread != null){
|
serverStartThread.interrupt();
|
}
|
logger.info("***************************************");
|
logger.info("长连接引擎被关闭:" + this.getName() + ", port = " + this.getPort());
|
logger.info("***************************************");
|
}
|
|
private static final String ERROR_NO_NAME = "缺少通道或设备唯一id,无法推送消息";
|
private static final String ERROR_NO_CONNECTION = "未找到连接,无法推送tcp数据";
|
|
@Override
|
protected String onSendResponse(Response<?> response) {
|
//获取业务定义的终端ID
|
String name = response.getName();
|
if(StringUtils.isEmpty(name)){
|
// throw new IllegalArgumentException("缺少通道或设备唯一id,无法推送消息");
|
return ERROR_NO_NAME;
|
}
|
LongConnection conn = (LongConnection)this.getConnectionManager().getConnectionByName(name);
|
if(conn == null){
|
logger.warn("未找到连接,无法推送tcp数据, name=" + name + ", content=");
|
return ERROR_NO_CONNECTION;
|
}
|
conn.write(response);
|
logger.debug(conn.getName() + ", " + conn.getConnectionHost() + ", " + conn.getClass().getName());
|
|
// 2023-08-24,如果是业务调用,更新连接最近时间
|
// 如果后续还要加TCP心跳,则需要在这里追加判断,TCP心跳为内容:0#
|
if(!response.getProtocolNum().equals(Constants.PROTOCOL_HEART_BEAT)){
|
conn.setLastTime(System.currentTimeMillis());
|
}
|
// 2023-08-25,连接后执行回调函数
|
if(response.getProtocolNum().equals(Constants.PROTOCOL_LOGIN)){
|
if(this.connectionCallback != null){
|
this.connectionCallback.onAfterLogin(conn);
|
}
|
}
|
|
if(logger.isDebugEnabled()){
|
logger.debug("发送一条消息到客户端:" + response);
|
}
|
return null;
|
}
|
|
private class ListeningClientThread extends Thread{
|
public ListeningClientThread(){
|
setDaemon(true);
|
}
|
|
public void run(){
|
if(getEngineStartDelaySeconds() > 0){
|
try {
|
logger.info("ListeningClientThread 延时启动(秒)={}", getEngineStartDelaySeconds());
|
TimeUnit.SECONDS.sleep(getEngineStartDelaySeconds());
|
} catch (InterruptedException e) {
|
throw new RuntimeException("监听tcp客户端线程延时启动,被中断:" + e.getMessage(), e);
|
}
|
}
|
|
bossGroup = new NioEventLoopGroup(bossThreadNum, new DefaultThreadFactory("bossServer", true));
|
workerGroup = new NioEventLoopGroup(workerThreadNum, new DefaultThreadFactory("workerGroup", true));
|
|
ServerBootstrap b = new ServerBootstrap();
|
b.group(bossGroup, workerGroup);
|
|
// 2023-10-14 暂时还原通道ID生成策略,自定义后发现通道id不会保持,目前还在找原因。
|
b.channel(NioServerSocketChannel.class);
|
// 2023-09-29 使用自定义通道ID生成,支持集群环境。
|
// b.channel(GenIdNioSocketChannel.class);
|
// 2023-09-30 测试发现,通过设置该对象能自定义通道ID
|
// b.channelFactory(new GenIdChannelFactory());
|
|
b.childHandler(serverInitializer);
|
|
logger.info("***************************************");
|
logger.info("长连接引擎启动,端口:" + getPort());
|
logger.info("名称:" + DefaultLongEngine.this.getName());
|
logger.info("***************************************");
|
|
try{
|
// 服务器绑定端口监听
|
ChannelFuture f = b.bind(getPort()).sync();
|
|
// 监听服务器关闭监听
|
f.channel().closeFuture().sync();
|
|
} catch(Exception ex){
|
logger.error(null, ex);
|
bossGroup.shutdownGracefully();
|
workerGroup.shutdownGracefully();
|
throw new RuntimeException("启动长连接引擎失败,正在关闭:" + this.getName());
|
}
|
}
|
}
|
|
// @Override
|
// protected Response<?> getServerHeartBeatResponse() {
|
// return heartBeatResponse;
|
// }
|
|
}
|