package com.walker.tcp.support; import com.walker.infrastructure.utils.StringUtils; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionCallback; import com.walker.tcp.ConnectionManager; import com.walker.tcp.ProtocolResolver; import com.walker.tcp.Response; import com.walker.tcp.TcpEngine; import com.walker.tcp.lb.LongConnectionMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public abstract class AbstractTcpLongEngine implements TcpEngine { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); private int port = 5800; private String name; private ConnectionManager connectionManager; private boolean openHeartBeat = true; private int id = 0; private List> protocolResolverList; // 2023-08-24 连接空闲回调对象 protected ConnectionCallback connectionCallback; public void setConnectionCallback(ConnectionCallback connectionCallback) { this.connectionCallback = connectionCallback; } public List> getProtocolResolverList() { return protocolResolverList; } public void setProtocolResolverList(List> protocolResolverList) { this.protocolResolverList = protocolResolverList; } public boolean isOpenHeartBeat() { return openHeartBeat; } /** * 设置是否启动服务端心跳检测线程 * @param openHeartBeat */ public void setOpenHeartBeat(boolean openHeartBeat) { this.openHeartBeat = openHeartBeat; } public void setPort(int port) { this.port = port; } public void setName(String name) { this.name = name; } public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } @Override public int getPort() { return port; } @Override public String getName() { return name; } @Override public boolean supportLongConnection() { return true; } @Override public ConnectionManager getConnectionManager() { return connectionManager; } @Override public void start() throws Exception { // if(StringUtils.isEmptyList(this.protocolResolverList)){ // throw new IllegalArgumentException(ProtocolResolver.ERR_NOFOUND); // } try{ this.onStart(); startTime = System.currentTimeMillis(); if(openHeartBeat){ this.startHeartBeatThread(); } } catch(Exception ex){ logger.error("启动长连接失败:" + this.getName(), ex); throw new Exception(ex); } } private void startHeartBeatThread(){ if(!started){ executorService.execute(timerRunner); logger.info("长连接'" + name + "'启动......"); started = true; } else { throw new IllegalStateException("长连接已启动,调用状态错误。name = " + name); } } @Override public void shutdown() throws Exception { this.onShutdown(); } @Override public String sendResponse(Response response){ if(!this.supportLongConnection()){ throw new IllegalStateException("改引擎不支持长连接,不能向客户端推送消息"); } return this.onSendResponse(response); } /** * 向所有客户端发送广播消息 * @param response */ public void sendBroadcast(Response response){ List clientList = this.connectionManager.queryAllConnectionListBy(getId()); logger.debug("------> 准备发送广播,engineId = {}, 客户端数量:{}", this.getId(), clientList == null? 0: clientList.size()); if(!StringUtils.isEmptyList(clientList)){ for(Connection conn : clientList){ conn.write(response); } } } protected abstract void onStart() throws Exception; protected abstract void onShutdown() throws Exception; /** * 通过引擎下发指令时,回调该函数 * @param response */ protected abstract String onSendResponse(Response response); // /** // * 返回服务端定义的心跳响应,由子类实现,因为涉及到消息的分隔符。 // * @return // */ // protected abstract Response getServerHeartBeatResponse(); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // 检测客户端连接状态的子线程 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ long startTime = 0; boolean started = false; // 是否已经启动运行 private int currentFailedTime = 0; // 内部时钟频率,默认:5分钟 private long heartBeatTimeInterval = 1000 * 60 * 5; public long getHeartBeatTimeInterval() { return heartBeatTimeInterval; } /** * 设置心跳检测频率,默认:5分钟(5 * 60000毫秒) * @param timeInterval */ public void setHeartBeatTimeInterval(long timeInterval) { this.heartBeatTimeInterval = timeInterval; } private Thread kernelThread = null; private ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { kernelThread = new Thread(r); kernelThread.setDaemon(true); return kernelThread; } }); private InteralTimerRunner timerRunner = new InteralTimerRunner(); // private final Response testResponse = this.getServerHeartBeatResponse(); private class InteralTimerRunner implements Runnable{ private boolean stop = false; public InteralTimerRunner(){} // public void setStop() { // this.stop = true; // } @Override public void run() { while(!stop){ try{ // List list = connectionManager.queryAllConnectionList(); List list = connectionManager.queryAllConnectionListBy(AbstractTcpLongEngine.this.getId()); if(!StringUtils.isEmptyList(list)){ long currentTime = System.currentTimeMillis(); for(Connection conn : list){ // if(!conn.isConnected()){ // connectionManager.removeConnection(conn.getId()); // logger.info("连接已断开,服务端移除该对象:" + conn.getName()); // } else { // conn.write(emptyResponse); // } // 定时检测客户端是否物理连接,直接发空数据尝试 if(logger.isDebugEnabled()){ logger.debug(AbstractTcpLongEngine.this.getId() + " 发送心跳数据,name:" + conn.getName()); } // conn.write(getServerHeartBeatResponse()); if(!(conn instanceof LongConnectionMeta)){ // 2023-09-26 只有物理连接才能推送心跳 conn.write(conn.getProtocolResolver().getHeartBeatResponse()); } // 2023-08-24,检测连接通道是否空闲(心跳不算) // 目前没有实际使用。 if(connectionCallback != null && (currentTime - conn.getLastTime())/1000 >= 600){ connectionCallback.onIdle(conn); } } } } catch(Exception ex){ if(currentFailedTime >= Integer.MAX_VALUE){ currentFailedTime = 0; } currentFailedTime ++; logger.error("连接检测线程调用失败一次 " , ex); } finally { try { if(heartBeatTimeInterval > 0){ logger.debug("*************** sleep: " + heartBeatTimeInterval); TimeUnit.MILLISECONDS.sleep(heartBeatTimeInterval); } } catch (InterruptedException e) {} } } } } @Override public long getStartTime() { return startTime; } @Override public int getId() { return id; } public void setId(int id){ if(id <= 0 || id >= Integer.MAX_VALUE){ throw new IllegalArgumentException("engine id must 0~65536"); } if(this.id > 0){ throw new IllegalArgumentException("engine id already exist, id = " + this.id); } this.id = id; } public long getEngineStartDelaySeconds() { return engineStartDelaySeconds; } /** * 设置引擎,延时启动,多少秒,默认:120 * @param engineStartDelaySeconds * @date 2023-09-21 */ public void setEngineStartDelaySeconds(long engineStartDelaySeconds) { this.engineStartDelaySeconds = engineStartDelaySeconds; } private long engineStartDelaySeconds = 120; }