package com.iplatform.tcp.support; import com.iplatform.model.po.TcpEquip; import com.iplatform.model.po.TcpEquipStatus; import com.iplatform.tcp.EngineType; import com.iplatform.tcp.EquipmentCacheProvider; import com.iplatform.tcp.EquipmentStatusCacheProvider; import com.iplatform.tcp.LiveStatus; import com.iplatform.tcp.service.TcpEquipStatusServiceImpl; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; import com.walker.tcp.Connection; import com.walker.tcp.support.SimpleEngineConnectionManager; /** * 可持久化存储连接状态的连接管理器实现。 * @author 时克英 * @date 2018-11-29 * */ public class PersistentConnectionManager extends SimpleEngineConnectionManager { public void setWebsocketPersistent(boolean websocketPersistent) { this.websocketPersistent = websocketPersistent; } private boolean websocketPersistent = false; @Override protected void onSaveConnection(Connection connection) throws Exception { if(connection.getEngineId() == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){ // websocket连接不需要持久化 return; } logger.debug("保存了一个连接:" + connection.getName()); String num = connection.getName(); TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(num); if(cacheStatus == null){ // 数据库还没有状态记录 TcpEquipStatus newStatus = this.createStatus(connection, num); this.tcpEquipStatusService.insert(newStatus); statusCache.putEquipmentStatus(num, newStatus); } else { // 已经存在记录(更新缓存数据,并也更新status表) cacheStatus.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis()))); cacheStatus.setLiveStatus(LiveStatus.CONST_CONNECTED); this.tcpEquipStatusService.save(cacheStatus); // statusCache.updateCacheData(num, cacheStatus); } this.afterSaveConnection(cacheStatus); } private TcpEquipStatus createStatus(Connection connection, String num){ TcpEquip equip = this.getEquipmentFromCache(num); TcpEquipStatus entity = new TcpEquipStatus(); entity.setCreateTime(NumberGenerator.getSequenceNumber()); entity.setEquipNum(num); entity.setId(NumberGenerator.getLongSequenceNumber()); entity.setLiveStatus(LiveStatus.CONST_CONNECTED); entity.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(connection.getCreateTimeMills()))); if(equip == null){ // 没有注册设备,但仍允许上报数据 entity.setEquipId(0L); entity.setDept(0L); } else { entity.setEquipId(equip.getId()); entity.setDept(equip.getDept()); } return entity; } private TcpEquip getEquipmentFromCache(String num){ return equipCache.getEquipment(num); } @Override protected void onDeleteConnection(int engineId, String name) throws Exception { if(engineId == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){ // websocket连接不需要持久化 return; } logger.debug("删除了一个连接:" + name); TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(name); cacheStatus.setEndTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis()))); cacheStatus.setLiveStatus(LiveStatus.CONST_NOT_CONNECT); this.tcpEquipStatusService.save(cacheStatus); this.afterDeleteConnection(cacheStatus); } @Override protected void onUpdateLastTime(int engineId, String name, long lastTime) throws Exception { logger.debug("更新了一个连接:" + name); } /** * 保存连接之后动作,由子类实现。目前为websocket推送做准备 * @param cacheStatus */ protected void afterSaveConnection(TcpEquipStatus cacheStatus){ // OnlineResponse response = new OnlineResponse(); // response.setName(cacheStatus.getEquipNum()); // this.webSocketEngine.sendBroadcast(response); } /** * 设备断开连接之后动作,由子类实现。目前为websocket推送做准备 * @param cacheStatus */ protected void afterDeleteConnection(TcpEquipStatus cacheStatus){ // OfflineResponse response = new OfflineResponse(); // response.setName(cacheStatus.getEquipNum()); // this.webSocketEngine.sendBroadcast(response); } public void setStatusCache(EquipmentStatusCacheProvider statusCache) { this.statusCache = statusCache; } public void setEquipCache(EquipmentCacheProvider equipCache) { this.equipCache = equipCache; } public void setTcpEquipStatusService(TcpEquipStatusServiceImpl tcpEquipStatusService) { this.tcpEquipStatusService = tcpEquipStatusService; } private EquipmentStatusCacheProvider statusCache; private EquipmentCacheProvider equipCache; private TcpEquipStatusServiceImpl tcpEquipStatusService; }