package com.walker.tcp.support; import com.walker.db.page.GenericPager; import com.walker.infrastructure.utils.StringUtils; import com.walker.security.SystemLogMan; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionCache; import com.walker.tcp.ConnectionManager; import com.walker.tcp.connect.AbstractConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public abstract class AbstractConnectionManager implements ConnectionManager { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); public static final String TIP_NULL_CONNECTION = "不能放入空connection"; // public static final String TIP_WARN_IDINDEX = "缓存中该连接id不存在:idList.get(i)。i = "; private boolean multipleEngine = false; // 连接缓存,key = sessionId, value = Connection // private final Map cached = new ConcurrentHashMap<>(128); // 通道ID与绑定用户名称的对应关系,key = 绑定业务名称,value = 通道ID // private Map idNameCached = new HashMap<>(128); // 存放连接id集合,用于分页显示 // private LinkedList idList = new LinkedList<>(); // private Map idIndex = new HashMap<>(128); //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // 连接类型,时克英修改,2018-09-20 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ private Map connectionTypeCache = new ConcurrentHashMap(); public void setConnectionTypeMap(Map connectionTypes){ if(connectionTypes != null){ this.connectionTypeCache = connectionTypes; } SystemLogMan.getInstance().checkMan(); } public void setMultipleEngine(boolean multipleEngine) { this.multipleEngine = multipleEngine; } @Override public boolean isSupportMultipleEngine(){ return multipleEngine; } @Override public void putConnection(Connection connection){ if(connection == null){ throw new IllegalArgumentException(TIP_NULL_CONNECTION); } // String id = connection.getId(); String name = connection.getName(); // 必须在认证时候才能创建缓存连接对象! if(StringUtils.isEmpty(name)){ throw new IllegalArgumentException("connection对象未设置name属性,业务将无法使用"); // logger.warn("连接已设置,但还未绑定业务id,后续会有绑定操作,可忽略。"); } // 2023-09-19,调整使用连接缓存对象。 /* if(idNameCached.get(name) != null){ logger.warn("终端设备在未断开连接的情况下,重新登录注册:" + name); // throw new IllegalArgumentException("现有连接中已经存在该名称,无法重复缓存同名连接"); *//** 注意:这里的ID是原来老的connection的ID,不是新ID *//* doRemoveCacheInfo(idNameCached.get(name), connection); } cached.put(id, connection); idNameCached.put(name, id);*/ String oldId = this.connectionCache.getIdByName(name); if(StringUtils.isNotEmpty(oldId)){ logger.warn("终端设备在未断开连接的情况下,重新登录注册:" + name + ", 删除老连接:" + oldId); this.connectionCache.removeConnection(oldId); } this.connectionCache.putConnection(connection); // 维护分页信息 // idIndex.put(id, idList.size()); // idList.add(id); try { this.onSaveConnection(connection); } catch (Exception e) { logger.error("业务调用持久化连接错误:" + e.getMessage(), e); } } // /** // * 从缓存中删除连接信息,包括:索引和分页信息 // * @param id // * @param conn // */ // private void doRemoveCacheInfo(String id, Connection conn){ // if(conn != null){ // idNameCached.remove(conn.getName()); // } // cached.remove(id); // } @Override public void updateConnection(String id, Connection connection){ if(connection == null){ throw new IllegalArgumentException(TIP_NULL_CONNECTION); } ((AbstractConnection)connection).setLastTime(System.currentTimeMillis()); // 2023-09-19,调整使用连接缓存对象。 // cached.put(id, connection); this.connectionCache.updateConnection(connection); try { this.onUpdateConnection(connection); // 2023-09-25 this.onUpdateLastTime(connection.getEngineId(), connection.getName(), connection.getLastTime()); } catch (Exception e) { logger.error("业务更新持久化连接错误:" + e.getMessage(), e); } } @Override public void removeConnection(String id){ // Connection conn = cached.get(id); Connection conn = this.connectionCache.getConnection(id); // doRemoveCacheInfo(id, conn); if(conn != null){ try { this.onDeleteConnection(conn.getEngineId(), conn.getName()); } catch (Exception e) { logger.error("业务删除持久化连接错误:" + e.getMessage(), e); } } this.connectionCache.removeConnection(id); } @Override public void removeConnectionByName(String name){ if(StringUtils.isEmpty(name)){ throw new IllegalArgumentException("name is required!"); } // String id = idNameCached.get(name) ; String id = this.connectionCache.getIdByName(name); if(id == null){ logger.warn("根据终端名称查找通道ID错误:未找到name=" + name); return; } // Connection conn = cached.get(id); // doRemoveCacheInfo(id, conn); this.connectionCache.removeConnection(id); // 2023-09-25,为负载修改 try { this.onDeleteConnection(0, name); } catch (Exception e) { logger.error("业务删除持久化连接错误:" + e.getMessage() + ", name=" + name, e); } } @Override public void updateLastTime(String id){ // Connection conn = cached.get(id); Connection conn = this.connectionCache.getConnection(id); if(conn != null){ ((AbstractConnection)conn).setLastTime(System.currentTimeMillis()); } } @Override public Connection getConnection(String id){ // return cached.get(id); return this.connectionCache.getConnection(id); } @Override public void putConnection(int engineId, Connection connection){ throw new UnsupportedOperationException(); } @Override public void updateConnection(int engineId, String id, Connection connection){ throw new UnsupportedOperationException(); } @Override public void removeConnection(int engineId, String id){ throw new UnsupportedOperationException(); } @Override public void updateLastTime(int engineId, String id){ throw new UnsupportedOperationException(); } @Override public Connection getConnection(int engineId, String id){ throw new UnsupportedOperationException(); } @Deprecated @Override public GenericPager queryPageConnectionList(int pageIndex, int pageSize){ // GenericPager pager = ListPageContext.createGenericPager(pageIndex, pageSize, cached.size()); // List datas = new ArrayList<>(); // long total = pager.getTotalRows(); // // if(total > 0){ // long firstDataIndex = pager.getFirstRowIndexInPage(); // long endDataIndex = pager.getEndRowIndexPage(); // String id = null; // for(long i=firstDataIndex; i queryAllConnectionList(){ // List result = new ArrayList<>(); // for(Connection conn : cached.values()){ // result.add(conn); // } // return result; return this.connectionCache.getAllConnectionList(); } //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // 根据engineId,返回某一类连接集合 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @Override public Map getConnectionTypeMap(){ return this.connectionTypeCache; } @Override public String getConnectionTypeName(int engineId){ return this.connectionTypeCache.get(engineId); } @Override public List queryAllConnectionListBy(int engineId){ List result = new ArrayList<>(); // for(Connection conn : cached.values()){ for(Connection conn : this.connectionCache.getAllConnectionList()){ if(conn.getEngineId() == engineId){ result.add(conn); } } return result; } /** * 根据业务名称,返回连接对象。如:设备编号ID * @param name * @return */ @Override public Connection getConnectionByName(String name){ // String id = idNameCached.get(name); // if(StringUtils.isNotEmpty(id)){ // return cached.get(id); // } // return null; return this.connectionCache.getConnectionByName(name); } /** * 在系统创建完连接时,提供业务回调,例如:把连接数据保存到数据库中。 * @param connection */ protected abstract void onSaveConnection(Connection connection) throws Exception; /** * 在系统创建完连接时,提供业务回调 * @param engineId * @param id */ protected abstract void onDeleteConnection(int engineId, String id) throws Exception; /** * 在系统创建完连接时,提供业务回调 * @param engineId 引擎id * @param id 连接名称 * @param lastTime 最后一次更新时间 */ protected abstract void onUpdateLastTime(int engineId, String id, long lastTime) throws Exception; /** * 更新底层链路连接时,触发方法,子类实现。 * @param connection * @throws Exception * @date 2023-09-25 */ protected abstract void onUpdateConnection(Connection connection) throws Exception; public ConnectionCache getConnectionCache() { return connectionCache; } public void setConnectionCache(ConnectionCache connectionCache) { this.connectionCache = connectionCache; } private ConnectionCache connectionCache; }