package com.walker.tcp.lb; import com.walker.infrastructure.utils.StringUtils; import com.walker.tcp.Connection; import com.walker.tcp.support.AbstractConnectionManager; import com.walker.tcp.support.MemoryConnectionCache; import java.util.List; public abstract class LoadBalanceConnectionManager extends AbstractConnectionManager { public LoadBalanceConnectionManager(){ // 默认提供一个内存连接缓存实现。2023-09-19 // 该缓存仅保存了当前节点的所有连接信息,不是全部。这里暂时这样处理。2023-09-25 this.setConnectionCache(new MemoryConnectionCache()); } @Override protected void onSaveConnection(Connection connection) throws Exception { if(StringUtils.isEmpty(this.connectionHost)){ throw new IllegalArgumentException("LoadBalanceConnectionManager未配置主机标识"); } String savedConnectionHost = connection.getConnectionHost(); if(StringUtils.isEmpty(savedConnectionHost)){ throw new IllegalArgumentException("connection未包含主机标识"); } if(!this.connectionHost.equals(savedConnectionHost)){ throw new IllegalArgumentException("savedConnectionHost与当前主机不匹配:" + savedConnectionHost + ", connectionHost" + connectionHost); } // 缓存连接元数据相关。 // LongConnectionMeta connectionMeta = this.acquireConnectionMeta(connection); // if(connectionMeta == null){ // throw new AbstractMethodError("请实现抽象方法:acquireConnectionMeta"); // } // // 添加连接元数据对象缓存 // this.connectionMetaCache.putConnectionMeta(connectionMeta); // // 添加连接名称与ID对应缓存 // this.connectionNameCache.putConnectionName(connection.getName(), connection.getId()); this.doSaveConnectionMeta(connection); } @Override protected void onUpdateConnection(Connection connection) throws Exception{ this.doSaveConnectionMeta(connection); } /** * 保存一个具体连接元数据对象,全局保存到redis缓存。 * @param connection */ private void doSaveConnectionMeta(Connection connection){ LongConnectionMeta connectionMeta = this.acquireConnectionMeta(connection); if(connectionMeta == null){ throw new AbstractMethodError("请实现抽象方法:acquireConnectionMeta"); } // 添加连接元数据对象缓存 this.connectionMetaCache.putConnectionMeta(connectionMeta); // 添加连接名称与ID对应缓存 this.connectionNameCache.putConnectionName(connection.getName(), connection.getId()); } @Override protected void onDeleteConnection(int engineId, String name) throws Exception { String id = this.connectionNameCache.getConnectionId(name); if(StringUtils.isEmpty(id)){ logger.warn("根据连接名称未找到id,name={}", name); return; } this.connectionMetaCache.removeConnectionMeta(id); this.connectionNameCache.removeConnectionName(name); } @Override protected void onUpdateLastTime(int engineId, String id, long lastTime) throws Exception { } @Override public Connection getConnection(String id){ Connection connection = super.getConnection(id); if(connection == null){ logger.debug("connection不存在,可能是其他节点中,继续查找。id={}", id); LongConnectionMeta connectionMeta = this.connectionMetaCache.getConnectionMeta(id); if(connectionMeta == null){ logger.debug("connectionMeta也未找到,id={}", id); return null; } // 2023-09-28 给反序列化生成的对象,设置写入器,否则无法发送消息。 connectionMeta.setResponseWriter(this.responseWriter); return connectionMeta; } else { return connection; } } @Override public Connection getConnectionByName(String name){ Connection connection = super.getConnectionByName(name); if(connection == null){ logger.debug("connection不存在,可能是其他节点中,继续查找。name={}", name); String id = this.connectionNameCache.getConnectionId(name); if(StringUtils.isEmpty(id)){ logger.debug("通过name未找到连接id, name = {}", name); return null; } LongConnectionMeta connectionMeta = this.connectionMetaCache.getConnectionMeta(id); if(connectionMeta == null){ logger.debug("connectionMeta也未找到,id={}", id); return null; } // 2023-09-28 给反序列化生成的对象,设置写入器,否则无法发送消息。 connectionMeta.setResponseWriter(this.responseWriter); return connectionMeta; } else { return connection; } } @Override public List queryAllConnectionListBy(int engineId){ throw new UnsupportedOperationException("负载环境不支持该方法:queryAllConnectionListBy(enginId)"); } /** * 子类获取一个具体的LongConnectionMeta对象。 * @param connection * @return * @date 2023-09-25 */ protected abstract LongConnectionMeta acquireConnectionMeta(Connection connection); /** * 返回连接管理器,在当前节点中对应的主机标识。 *

在集群环境中,每个主机中都会运行一个连接管理器,需要通过主机环境标识以区分。

* @return * @date 2023-09-25 */ public String getConnectionHost() { return connectionHost; } public void setConnectionHost(String connectionHost) { this.connectionHost = connectionHost; } /** * 长连接元数据缓存对象。 * @return */ public RedisConnectionMetaCache getConnectionMetaCache() { return connectionMetaCache; } public void setConnectionMetaCache(RedisConnectionMetaCache connectionMetaCache) { this.connectionMetaCache = connectionMetaCache; } /** * 长连接,连接名称与ID对应关系缓存。 * @return */ public RedisConnectionNameCache getConnectionNameCache() { return connectionNameCache; } public void setConnectionNameCache(RedisConnectionNameCache connectionNameCache) { this.connectionNameCache = connectionNameCache; } public void setResponseWriter(ResponseWriter responseWriter) { this.responseWriter = responseWriter; } public ResponseWriter getResponseWriter() { return responseWriter; } private ResponseWriter responseWriter; private RedisConnectionMetaCache connectionMetaCache; private RedisConnectionNameCache connectionNameCache; private String connectionHost; }