package com.iplatform.tcp.support;
|
|
import com.iplatform.model.po.TcpEquip;
|
import com.iplatform.model.po.TcpEquipStatus;
|
import com.iplatform.tcp.EngineType;
|
import com.iplatform.tcp.EquipmentCacheProvider;
|
import com.iplatform.tcp.EquipmentStatusCacheProvider;
|
import com.iplatform.tcp.LiveStatus;
|
import com.iplatform.tcp.service.TcpEquipStatusServiceImpl;
|
import com.walker.infrastructure.utils.DateUtils;
|
import com.walker.infrastructure.utils.NumberGenerator;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.support.SimpleEngineConnectionManager;
|
|
/**
|
* 可持久化存储连接状态的连接管理器实现。
|
* @author 时克英
|
* @date 2018-11-29
|
*
|
*/
|
public class PersistentConnectionManager extends SimpleEngineConnectionManager {
|
|
public void setWebsocketPersistent(boolean websocketPersistent) {
|
this.websocketPersistent = websocketPersistent;
|
}
|
|
private boolean websocketPersistent = false;
|
|
@Override
|
protected void onSaveConnection(Connection connection) throws Exception {
|
if(connection.getEngineId() == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){
|
// websocket连接不需要持久化
|
return;
|
}
|
logger.debug("保存了一个连接:" + connection.getName());
|
String num = connection.getName();
|
TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(num);
|
if(cacheStatus == null){
|
// 数据库还没有状态记录
|
TcpEquipStatus newStatus = this.createStatus(connection, num);
|
this.tcpEquipStatusService.insert(newStatus);
|
statusCache.putEquipmentStatus(num, newStatus);
|
|
} else {
|
// 已经存在记录(更新缓存数据,并也更新status表)
|
cacheStatus.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis())));
|
cacheStatus.setLiveStatus(LiveStatus.CONST_CONNECTED);
|
this.tcpEquipStatusService.save(cacheStatus);
|
// statusCache.updateCacheData(num, cacheStatus);
|
}
|
|
this.afterSaveConnection(cacheStatus);
|
}
|
|
private TcpEquipStatus createStatus(Connection connection, String num){
|
TcpEquip equip = this.getEquipmentFromCache(num);
|
TcpEquipStatus entity = new TcpEquipStatus();
|
entity.setCreateTime(NumberGenerator.getSequenceNumber());
|
entity.setEquipNum(num);
|
entity.setId(NumberGenerator.getLongSequenceNumber());
|
entity.setLiveStatus(LiveStatus.CONST_CONNECTED);
|
entity.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(connection.getCreateTimeMills())));
|
if(equip == null){
|
// 没有注册设备,但仍允许上报数据
|
entity.setEquipId(0L);
|
entity.setDept(0L);
|
} else {
|
entity.setEquipId(equip.getId());
|
entity.setDept(equip.getDept());
|
}
|
return entity;
|
}
|
|
private TcpEquip getEquipmentFromCache(String num){
|
return equipCache.getEquipment(num);
|
}
|
|
@Override
|
protected void onDeleteConnection(int engineId, String name) throws Exception {
|
if(engineId == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){
|
// websocket连接不需要持久化
|
return;
|
}
|
logger.debug("删除了一个连接:" + name);
|
TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(name);
|
cacheStatus.setEndTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis())));
|
cacheStatus.setLiveStatus(LiveStatus.CONST_NOT_CONNECT);
|
this.tcpEquipStatusService.save(cacheStatus);
|
|
this.afterDeleteConnection(cacheStatus);
|
}
|
|
@Override
|
protected void onUpdateLastTime(int engineId, String name, long lastTime) throws Exception {
|
logger.debug("更新了一个连接:" + name);
|
}
|
|
/**
|
* 保存连接之后动作,由子类实现。目前为websocket推送做准备
|
* @param cacheStatus
|
*/
|
protected void afterSaveConnection(TcpEquipStatus cacheStatus){
|
// OnlineResponse response = new OnlineResponse();
|
// response.setName(cacheStatus.getEquipNum());
|
// this.webSocketEngine.sendBroadcast(response);
|
}
|
|
/**
|
* 设备断开连接之后动作,由子类实现。目前为websocket推送做准备
|
* @param cacheStatus
|
*/
|
protected void afterDeleteConnection(TcpEquipStatus cacheStatus){
|
// OfflineResponse response = new OfflineResponse();
|
// response.setName(cacheStatus.getEquipNum());
|
// this.webSocketEngine.sendBroadcast(response);
|
}
|
|
public void setStatusCache(EquipmentStatusCacheProvider statusCache) {
|
this.statusCache = statusCache;
|
}
|
|
public void setEquipCache(EquipmentCacheProvider equipCache) {
|
this.equipCache = equipCache;
|
}
|
|
public void setTcpEquipStatusService(TcpEquipStatusServiceImpl tcpEquipStatusService) {
|
this.tcpEquipStatusService = tcpEquipStatusService;
|
}
|
|
private EquipmentStatusCacheProvider statusCache;
|
private EquipmentCacheProvider equipCache;
|
private TcpEquipStatusServiceImpl tcpEquipStatusService;
|
}
|