package com.walker.tcp.support;
|
|
import com.walker.db.page.GenericPager;
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.ConnectionCache;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.connect.AbstractConnection;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
public abstract class AbstractConnectionManager implements ConnectionManager {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
|
|
public static final String TIP_NULL_CONNECTION = "不能放入空connection";
|
// public static final String TIP_WARN_IDINDEX = "缓存中该连接id不存在:idList.get(i)。i = ";
|
|
private boolean multipleEngine = false;
|
|
// 连接缓存,key = sessionId, value = Connection
|
// private final Map<String, Connection> cached = new ConcurrentHashMap<>(128);
|
|
// 通道ID与绑定用户名称的对应关系,key = 绑定业务名称,value = 通道ID
|
// private Map<String, String> idNameCached = new HashMap<>(128);
|
|
// 存放连接id集合,用于分页显示
|
// private LinkedList<String> idList = new LinkedList<>();
|
// private Map<String, Integer> idIndex = new HashMap<>(128);
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
// 连接类型,时克英修改,2018-09-20
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
private Map<Integer, String> connectionTypeCache = new ConcurrentHashMap<Integer, String>();
|
|
public void setConnectionTypeMap(Map<Integer, String> connectionTypes){
|
if(connectionTypes != null){
|
this.connectionTypeCache = connectionTypes;
|
}
|
}
|
|
public void setMultipleEngine(boolean multipleEngine) {
|
this.multipleEngine = multipleEngine;
|
}
|
|
@Override
|
public boolean isSupportMultipleEngine(){
|
return multipleEngine;
|
}
|
|
@Override
|
public void putConnection(Connection connection){
|
if(connection == null){
|
throw new IllegalArgumentException(TIP_NULL_CONNECTION);
|
}
|
|
// String id = connection.getId();
|
String name = connection.getName();
|
// 必须在认证时候才能创建缓存连接对象!
|
if(StringUtils.isEmpty(name)){
|
throw new IllegalArgumentException("connection对象未设置name属性,业务将无法使用");
|
// logger.warn("连接已设置,但还未绑定业务id,后续会有绑定操作,可忽略。");
|
}
|
|
// 2023-09-19,调整使用连接缓存对象。
|
/* if(idNameCached.get(name) != null){
|
logger.warn("终端设备在未断开连接的情况下,重新登录注册:" + name);
|
// throw new IllegalArgumentException("现有连接中已经存在该名称,无法重复缓存同名连接");
|
|
*//** 注意:这里的ID是原来老的connection的ID,不是新ID *//*
|
doRemoveCacheInfo(idNameCached.get(name), connection);
|
}
|
cached.put(id, connection);
|
idNameCached.put(name, id);*/
|
String oldId = this.connectionCache.getIdByName(name);
|
if(StringUtils.isNotEmpty(oldId)){
|
logger.warn("终端设备在未断开连接的情况下,重新登录注册:" + name + ", 删除老连接:" + oldId);
|
this.connectionCache.removeConnection(oldId);
|
}
|
this.connectionCache.putConnection(connection);
|
|
// 维护分页信息
|
// idIndex.put(id, idList.size());
|
// idList.add(id);
|
|
try {
|
this.onSaveConnection(connection);
|
} catch (Exception e) {
|
logger.error("业务调用持久化连接错误:" + e.getMessage(), e);
|
}
|
}
|
|
// /**
|
// * 从缓存中删除连接信息,包括:索引和分页信息
|
// * @param id
|
// * @param conn
|
// */
|
// private void doRemoveCacheInfo(String id, Connection conn){
|
// if(conn != null){
|
// idNameCached.remove(conn.getName());
|
// }
|
// cached.remove(id);
|
// }
|
|
@Override
|
public void updateConnection(String id, Connection connection){
|
if(connection == null){
|
throw new IllegalArgumentException(TIP_NULL_CONNECTION);
|
}
|
((AbstractConnection)connection).setLastTime(System.currentTimeMillis());
|
// 2023-09-19,调整使用连接缓存对象。
|
// cached.put(id, connection);
|
this.connectionCache.updateConnection(connection);
|
|
try {
|
this.onUpdateConnection(connection); // 2023-09-25
|
this.onUpdateLastTime(connection.getEngineId(), connection.getName(), connection.getLastTime());
|
} catch (Exception e) {
|
logger.error("业务更新持久化连接错误:" + e.getMessage(), e);
|
}
|
}
|
|
@Override
|
public void removeConnection(String id){
|
// Connection conn = cached.get(id);
|
Connection conn = this.connectionCache.getConnection(id);
|
// doRemoveCacheInfo(id, conn);
|
if(conn != null){
|
try {
|
this.onDeleteConnection(conn.getEngineId(), conn.getName());
|
} catch (Exception e) {
|
logger.error("业务删除持久化连接错误:" + e.getMessage(), e);
|
}
|
}
|
this.connectionCache.removeConnection(id);
|
}
|
|
@Override
|
public void removeConnectionByName(String name){
|
if(StringUtils.isEmpty(name)){
|
throw new IllegalArgumentException("name is required!");
|
}
|
|
// String id = idNameCached.get(name) ;
|
String id = this.connectionCache.getIdByName(name);
|
if(id == null){
|
logger.warn("根据终端名称查找通道ID错误:未找到name=" + name);
|
return;
|
}
|
|
// Connection conn = cached.get(id);
|
// doRemoveCacheInfo(id, conn);
|
this.connectionCache.removeConnection(id);
|
|
// 2023-09-25,为负载修改
|
try {
|
this.onDeleteConnection(0, name);
|
} catch (Exception e) {
|
logger.error("业务删除持久化连接错误:" + e.getMessage() + ", name=" + name, e);
|
}
|
}
|
|
@Override
|
public void updateLastTime(String id){
|
// Connection conn = cached.get(id);
|
Connection conn = this.connectionCache.getConnection(id);
|
if(conn != null){
|
((AbstractConnection)conn).setLastTime(System.currentTimeMillis());
|
}
|
}
|
|
@Override
|
public Connection getConnection(String id){
|
// return cached.get(id);
|
return this.connectionCache.getConnection(id);
|
}
|
|
@Override
|
public void putConnection(int engineId, Connection connection){
|
throw new UnsupportedOperationException();
|
}
|
|
@Override
|
public void updateConnection(int engineId, String id, Connection connection){
|
throw new UnsupportedOperationException();
|
}
|
|
@Override
|
public void removeConnection(int engineId, String id){
|
throw new UnsupportedOperationException();
|
}
|
|
@Override
|
public void updateLastTime(int engineId, String id){
|
throw new UnsupportedOperationException();
|
}
|
|
@Override
|
public Connection getConnection(int engineId, String id){
|
throw new UnsupportedOperationException();
|
}
|
|
@Deprecated
|
@Override
|
public GenericPager<Connection> queryPageConnectionList(int pageIndex, int pageSize){
|
// GenericPager<Connection> pager = ListPageContext.createGenericPager(pageIndex, pageSize, cached.size());
|
// List<Connection> datas = new ArrayList<>();
|
// long total = pager.getTotalRows();
|
//
|
// if(total > 0){
|
// long firstDataIndex = pager.getFirstRowIndexInPage();
|
// long endDataIndex = pager.getEndRowIndexPage();
|
// String id = null;
|
// for(long i=firstDataIndex; i<endDataIndex+1; i++){
|
// id = idList.get((int)i);
|
// if(StringUtils.isEmpty(id)){
|
// logger.warn(TIP_WARN_IDINDEX + i);
|
// continue;
|
// }
|
// datas.add(cached.get(id));
|
// }
|
// }
|
//
|
// pager.setDatas(datas);
|
// return pager;
|
throw new UnsupportedOperationException();
|
}
|
|
/**
|
* 负载环境中,仍然可以从服务端发送心跳,只是每个节点只发送自己连接的客户端而已。
|
* @return
|
* @date 2023-09-25
|
*/
|
@Override
|
public List<Connection> queryAllConnectionList(){
|
// List<Connection> result = new ArrayList<>();
|
// for(Connection conn : cached.values()){
|
// result.add(conn);
|
// }
|
// return result;
|
return this.connectionCache.getAllConnectionList();
|
}
|
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
// 根据engineId,返回某一类连接集合
|
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
@Override
|
public Map<Integer, String> getConnectionTypeMap(){
|
return this.connectionTypeCache;
|
}
|
|
@Override
|
public String getConnectionTypeName(int engineId){
|
return this.connectionTypeCache.get(engineId);
|
}
|
|
@Override
|
public List<Connection> queryAllConnectionListBy(int engineId){
|
List<Connection> result = new ArrayList<>();
|
// for(Connection conn : cached.values()){
|
for(Connection conn : this.connectionCache.getAllConnectionList()){
|
if(conn.getEngineId() == engineId){
|
result.add(conn);
|
}
|
}
|
return result;
|
}
|
|
/**
|
* 根据业务名称,返回连接对象。如:设备编号ID
|
* @param name
|
* @return
|
*/
|
@Override
|
public Connection getConnectionByName(String name){
|
// String id = idNameCached.get(name);
|
// if(StringUtils.isNotEmpty(id)){
|
// return cached.get(id);
|
// }
|
// return null;
|
return this.connectionCache.getConnectionByName(name);
|
}
|
|
/**
|
* 在系统创建完连接时,提供业务回调,例如:把连接数据保存到数据库中。
|
* @param connection
|
*/
|
protected abstract void onSaveConnection(Connection connection) throws Exception;
|
|
/**
|
* 在系统创建完连接时,提供业务回调
|
* @param engineId
|
* @param id
|
*/
|
protected abstract void onDeleteConnection(int engineId, String id) throws Exception;
|
|
/**
|
* 在系统创建完连接时,提供业务回调
|
* @param engineId 引擎id
|
* @param id 连接名称
|
* @param lastTime 最后一次更新时间
|
*/
|
protected abstract void onUpdateLastTime(int engineId, String id, long lastTime) throws Exception;
|
|
/**
|
* 更新底层链路连接时,触发方法,子类实现。
|
* @param connection
|
* @throws Exception
|
* @date 2023-09-25
|
*/
|
protected abstract void onUpdateConnection(Connection connection) throws Exception;
|
|
public ConnectionCache getConnectionCache() {
|
return connectionCache;
|
}
|
|
public void setConnectionCache(ConnectionCache connectionCache) {
|
this.connectionCache = connectionCache;
|
}
|
|
private ConnectionCache connectionCache;
|
}
|