package com.walker.tcp.lb;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.support.AbstractConnectionManager;
|
import com.walker.tcp.support.MemoryConnectionCache;
|
|
import java.util.List;
|
|
public abstract class LoadBalanceConnectionManager extends AbstractConnectionManager {
|
|
public LoadBalanceConnectionManager(){
|
// 默认提供一个内存连接缓存实现。2023-09-19
|
// 该缓存仅保存了当前节点的所有连接信息,不是全部。这里暂时这样处理。2023-09-25
|
this.setConnectionCache(new MemoryConnectionCache());
|
}
|
|
@Override
|
protected void onSaveConnection(Connection connection) throws Exception {
|
if(StringUtils.isEmpty(this.connectionHost)){
|
throw new IllegalArgumentException("LoadBalanceConnectionManager未配置主机标识");
|
}
|
String savedConnectionHost = connection.getConnectionHost();
|
if(StringUtils.isEmpty(savedConnectionHost)){
|
throw new IllegalArgumentException("connection未包含主机标识");
|
}
|
if(!this.connectionHost.equals(savedConnectionHost)){
|
throw new IllegalArgumentException("savedConnectionHost与当前主机不匹配:" + savedConnectionHost + ", connectionHost" + connectionHost);
|
}
|
|
// 缓存连接元数据相关。
|
// LongConnectionMeta connectionMeta = this.acquireConnectionMeta(connection);
|
// if(connectionMeta == null){
|
// throw new AbstractMethodError("请实现抽象方法:acquireConnectionMeta");
|
// }
|
// // 添加连接元数据对象缓存
|
// this.connectionMetaCache.putConnectionMeta(connectionMeta);
|
// // 添加连接名称与ID对应缓存
|
// this.connectionNameCache.putConnectionName(connection.getName(), connection.getId());
|
this.doSaveConnectionMeta(connection);
|
}
|
|
@Override
|
protected void onUpdateConnection(Connection connection) throws Exception{
|
this.doSaveConnectionMeta(connection);
|
}
|
|
/**
|
* 保存一个具体连接元数据对象,全局保存到redis缓存。
|
* @param connection
|
*/
|
private void doSaveConnectionMeta(Connection connection){
|
LongConnectionMeta connectionMeta = this.acquireConnectionMeta(connection);
|
if(connectionMeta == null){
|
throw new AbstractMethodError("请实现抽象方法:acquireConnectionMeta");
|
}
|
// 添加连接元数据对象缓存
|
this.connectionMetaCache.putConnectionMeta(connectionMeta);
|
// 添加连接名称与ID对应缓存
|
this.connectionNameCache.putConnectionName(connection.getName(), connection.getId());
|
}
|
|
@Override
|
protected void onDeleteConnection(int engineId, String name) throws Exception {
|
String id = this.connectionNameCache.getConnectionId(name);
|
if(StringUtils.isEmpty(id)){
|
logger.warn("根据连接名称未找到id,name={}", name);
|
return;
|
}
|
this.connectionMetaCache.removeConnectionMeta(id);
|
this.connectionNameCache.removeConnectionName(name);
|
}
|
|
@Override
|
protected void onUpdateLastTime(int engineId, String id, long lastTime) throws Exception {
|
|
}
|
|
@Override
|
public Connection getConnection(String id){
|
Connection connection = super.getConnection(id);
|
if(connection == null){
|
logger.debug("connection不存在,可能是其他节点中,继续查找。id={}", id);
|
LongConnectionMeta connectionMeta = this.connectionMetaCache.getConnectionMeta(id);
|
if(connectionMeta == null){
|
logger.debug("connectionMeta也未找到,id={}", id);
|
return null;
|
}
|
// 2023-09-28 给反序列化生成的对象,设置写入器,否则无法发送消息。
|
connectionMeta.setResponseWriter(this.responseWriter);
|
return connectionMeta;
|
} else {
|
return connection;
|
}
|
}
|
|
@Override
|
public Connection getConnectionByName(String name){
|
Connection connection = super.getConnectionByName(name);
|
if(connection == null){
|
logger.debug("connection不存在,可能是其他节点中,继续查找。name={}", name);
|
String id = this.connectionNameCache.getConnectionId(name);
|
if(StringUtils.isEmpty(id)){
|
logger.debug("通过name未找到连接id, name = {}", name);
|
return null;
|
}
|
LongConnectionMeta connectionMeta = this.connectionMetaCache.getConnectionMeta(id);
|
if(connectionMeta == null){
|
logger.debug("connectionMeta也未找到,id={}", id);
|
return null;
|
}
|
// 2023-09-28 给反序列化生成的对象,设置写入器,否则无法发送消息。
|
connectionMeta.setResponseWriter(this.responseWriter);
|
return connectionMeta;
|
} else {
|
return connection;
|
}
|
}
|
|
@Override
|
public List<Connection> queryAllConnectionListBy(int engineId){
|
throw new UnsupportedOperationException("负载环境不支持该方法:queryAllConnectionListBy(enginId)");
|
}
|
|
/**
|
* 子类获取一个具体的<code>LongConnectionMeta</code>对象。
|
* @param connection
|
* @return
|
* @date 2023-09-25
|
*/
|
protected abstract LongConnectionMeta acquireConnectionMeta(Connection connection);
|
|
/**
|
* 返回连接管理器,在当前节点中对应的主机标识。
|
* <p>在集群环境中,每个主机中都会运行一个连接管理器,需要通过主机环境标识以区分。</p>
|
* @return
|
* @date 2023-09-25
|
*/
|
public String getConnectionHost() {
|
return connectionHost;
|
}
|
|
public void setConnectionHost(String connectionHost) {
|
this.connectionHost = connectionHost;
|
}
|
|
/**
|
* 长连接元数据缓存对象。
|
* @return
|
*/
|
public RedisConnectionMetaCache getConnectionMetaCache() {
|
return connectionMetaCache;
|
}
|
|
public void setConnectionMetaCache(RedisConnectionMetaCache connectionMetaCache) {
|
this.connectionMetaCache = connectionMetaCache;
|
}
|
|
/**
|
* 长连接,连接名称与ID对应关系缓存。
|
* @return
|
*/
|
public RedisConnectionNameCache getConnectionNameCache() {
|
return connectionNameCache;
|
}
|
|
public void setConnectionNameCache(RedisConnectionNameCache connectionNameCache) {
|
this.connectionNameCache = connectionNameCache;
|
}
|
|
public void setResponseWriter(ResponseWriter responseWriter) {
|
this.responseWriter = responseWriter;
|
}
|
|
public ResponseWriter getResponseWriter() {
|
return responseWriter;
|
}
|
|
private ResponseWriter responseWriter;
|
private RedisConnectionMetaCache connectionMetaCache;
|
private RedisConnectionNameCache connectionNameCache;
|
private String connectionHost;
|
}
|