package com.walker.tcp.support;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.ConnectionCallback;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.ProtocolResolver;
|
import com.walker.tcp.Response;
|
import com.walker.tcp.TcpEngine;
|
import com.walker.tcp.lb.LongConnectionMeta;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.List;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.TimeUnit;
|
|
public abstract class AbstractTcpLongEngine implements TcpEngine {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
|
|
private int port = 5800;
|
private String name;
|
private ConnectionManager connectionManager;
|
|
private boolean openHeartBeat = true;
|
|
private int id = 0;
|
|
private List<ProtocolResolver<?>> protocolResolverList;
|
|
// 2023-08-24 连接空闲回调对象
|
protected ConnectionCallback connectionCallback;
|
|
public void setConnectionCallback(ConnectionCallback connectionCallback) {
|
this.connectionCallback = connectionCallback;
|
}
|
|
public List<ProtocolResolver<?>> getProtocolResolverList() {
|
return protocolResolverList;
|
}
|
|
public void setProtocolResolverList(List<ProtocolResolver<?>> protocolResolverList) {
|
this.protocolResolverList = protocolResolverList;
|
}
|
|
public boolean isOpenHeartBeat() {
|
return openHeartBeat;
|
}
|
|
/**
|
* 设置是否启动服务端心跳检测线程
|
* @param openHeartBeat
|
*/
|
public void setOpenHeartBeat(boolean openHeartBeat) {
|
this.openHeartBeat = openHeartBeat;
|
}
|
|
public void setPort(int port) {
|
this.port = port;
|
}
|
|
public void setName(String name) {
|
this.name = name;
|
}
|
|
public void setConnectionManager(ConnectionManager connectionManager) {
|
this.connectionManager = connectionManager;
|
}
|
|
@Override
|
public int getPort() {
|
return port;
|
}
|
|
@Override
|
public String getName() {
|
return name;
|
}
|
|
@Override
|
public boolean supportLongConnection() {
|
return true;
|
}
|
|
@Override
|
public ConnectionManager getConnectionManager() {
|
return connectionManager;
|
}
|
|
@Override
|
public void start() throws Exception {
|
// if(StringUtils.isEmptyList(this.protocolResolverList)){
|
// throw new IllegalArgumentException(ProtocolResolver.ERR_NOFOUND);
|
// }
|
try{
|
this.onStart();
|
startTime = System.currentTimeMillis();
|
if(openHeartBeat){
|
this.startHeartBeatThread();
|
}
|
|
} catch(Exception ex){
|
logger.error("启动长连接失败:" + this.getName(), ex);
|
throw new Exception(ex);
|
}
|
}
|
|
private void startHeartBeatThread(){
|
if(!started){
|
executorService.execute(timerRunner);
|
logger.info("长连接'" + name + "'启动......");
|
started = true;
|
} else {
|
throw new IllegalStateException("长连接已启动,调用状态错误。name = " + name);
|
}
|
}
|
|
@Override
|
public void shutdown() throws Exception {
|
this.onShutdown();
|
}
|
|
@Override
|
public String sendResponse(Response<?> response){
|
if(!this.supportLongConnection()){
|
throw new IllegalStateException("改引擎不支持长连接,不能向客户端推送消息");
|
}
|
return this.onSendResponse(response);
|
}
|
|
/**
|
* 向所有客户端发送广播消息
|
* @param response
|
*/
|
public void sendBroadcast(Response<?> response){
|
List<Connection> clientList = this.connectionManager.queryAllConnectionListBy(getId());
|
logger.debug("------> 准备发送广播,engineId = {}, 客户端数量:{}", this.getId(), clientList == null? 0: clientList.size());
|
if(!StringUtils.isEmptyList(clientList)){
|
for(Connection conn : clientList){
|
conn.write(response);
|
}
|
}
|
}
|
|
protected abstract void onStart() throws Exception;
|
protected abstract void onShutdown() throws Exception;
|
|
/**
|
* 通过引擎下发指令时,回调该函数
|
* @param response
|
*/
|
protected abstract String onSendResponse(Response<?> response);
|
|
// /**
|
// * 返回服务端定义的心跳响应,由子类实现,因为涉及到消息的分隔符。
|
// * @return
|
// */
|
// protected abstract Response<?> getServerHeartBeatResponse();
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
// 检测客户端连接状态的子线程
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
long startTime = 0;
|
boolean started = false; // 是否已经启动运行
|
private int currentFailedTime = 0;
|
|
// 内部时钟频率,默认:5分钟
|
private long heartBeatTimeInterval = 1000 * 60 * 5;
|
|
public long getHeartBeatTimeInterval() {
|
return heartBeatTimeInterval;
|
}
|
|
/**
|
* 设置心跳检测频率,默认:5分钟(5 * 60000毫秒)
|
* @param timeInterval
|
*/
|
public void setHeartBeatTimeInterval(long timeInterval) {
|
this.heartBeatTimeInterval = timeInterval;
|
}
|
|
private Thread kernelThread = null;
|
|
private ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() {
|
@Override
|
public Thread newThread(Runnable r) {
|
kernelThread = new Thread(r);
|
kernelThread.setDaemon(true);
|
return kernelThread;
|
}
|
});
|
|
private InteralTimerRunner timerRunner = new InteralTimerRunner();
|
|
// private final Response testResponse = this.getServerHeartBeatResponse();
|
|
private class InteralTimerRunner implements Runnable{
|
|
private boolean stop = false;
|
|
public InteralTimerRunner(){}
|
|
// public void setStop() {
|
// this.stop = true;
|
// }
|
|
@Override
|
public void run() {
|
while(!stop){
|
|
try{
|
// List<Connection> list = connectionManager.queryAllConnectionList();
|
List<Connection> list = connectionManager.queryAllConnectionListBy(AbstractTcpLongEngine.this.getId());
|
if(!StringUtils.isEmptyList(list)){
|
long currentTime = System.currentTimeMillis();
|
for(Connection conn : list){
|
// if(!conn.isConnected()){
|
// connectionManager.removeConnection(conn.getId());
|
// logger.info("连接已断开,服务端移除该对象:" + conn.getName());
|
// } else {
|
// conn.write(emptyResponse);
|
// }
|
// 定时检测客户端是否物理连接,直接发空数据尝试
|
if(logger.isDebugEnabled()){
|
logger.debug(AbstractTcpLongEngine.this.getId() + " 发送心跳数据,name:" + conn.getName());
|
}
|
// conn.write(getServerHeartBeatResponse());
|
if(!(conn instanceof LongConnectionMeta)){
|
// 2023-09-26 只有物理连接才能推送心跳
|
conn.write(conn.getProtocolResolver().getHeartBeatResponse());
|
}
|
|
// 2023-08-24,检测连接通道是否空闲(心跳不算)
|
// 目前没有实际使用。
|
if(connectionCallback != null && (currentTime - conn.getLastTime())/1000 >= 600){
|
connectionCallback.onIdle(conn);
|
}
|
}
|
}
|
} catch(Exception ex){
|
if(currentFailedTime >= Integer.MAX_VALUE){
|
currentFailedTime = 0;
|
}
|
currentFailedTime ++;
|
logger.error("连接检测线程调用失败一次 " , ex);
|
} finally {
|
try {
|
if(heartBeatTimeInterval > 0){
|
logger.debug("*************** sleep: " + heartBeatTimeInterval);
|
TimeUnit.MILLISECONDS.sleep(heartBeatTimeInterval);
|
}
|
} catch (InterruptedException e) {}
|
}
|
}
|
}
|
}
|
|
@Override
|
public long getStartTime() {
|
return startTime;
|
}
|
|
@Override
|
public int getId() {
|
return id;
|
}
|
|
public void setId(int id){
|
if(id <= 0 || id >= Integer.MAX_VALUE){
|
throw new IllegalArgumentException("engine id must 0~65536");
|
}
|
if(this.id > 0){
|
throw new IllegalArgumentException("engine id already exist, id = " + this.id);
|
}
|
this.id = id;
|
}
|
|
public long getEngineStartDelaySeconds() {
|
return engineStartDelaySeconds;
|
}
|
|
/**
|
* 设置引擎,延时启动,多少秒,默认:120
|
* @param engineStartDelaySeconds
|
* @date 2023-09-21
|
*/
|
public void setEngineStartDelaySeconds(long engineStartDelaySeconds) {
|
this.engineStartDelaySeconds = engineStartDelaySeconds;
|
}
|
|
private long engineStartDelaySeconds = 120;
|
}
|