shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package com.iplatform.tcp.support;
 
import com.iplatform.model.po.TcpEquip;
import com.iplatform.model.po.TcpEquipStatus;
import com.iplatform.tcp.EngineType;
import com.iplatform.tcp.EquipmentCacheProvider;
import com.iplatform.tcp.EquipmentStatusCacheProvider;
import com.iplatform.tcp.LiveStatus;
import com.iplatform.tcp.service.TcpEquipStatusServiceImpl;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.NumberGenerator;
import com.walker.tcp.Connection;
import com.walker.tcp.support.SimpleEngineConnectionManager;
 
/**
 * 可持久化存储连接状态的连接管理器实现。
 * @author 时克英
 * @date 2018-11-29
 *
 */
public class PersistentConnectionManager extends SimpleEngineConnectionManager {
 
    public void setWebsocketPersistent(boolean websocketPersistent) {
        this.websocketPersistent = websocketPersistent;
    }
 
    private boolean websocketPersistent = false;
 
    @Override
    protected void onSaveConnection(Connection connection) throws Exception {
        if(connection.getEngineId() == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){
            // websocket连接不需要持久化
            return;
        }
        logger.debug("保存了一个连接:" + connection.getName());
        String num = connection.getName();
        TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(num);
        if(cacheStatus == null){
            // 数据库还没有状态记录
            TcpEquipStatus newStatus = this.createStatus(connection, num);
            this.tcpEquipStatusService.insert(newStatus);
            statusCache.putEquipmentStatus(num, newStatus);
 
        } else {
            // 已经存在记录(更新缓存数据,并也更新status表)
            cacheStatus.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis())));
            cacheStatus.setLiveStatus(LiveStatus.CONST_CONNECTED);
            this.tcpEquipStatusService.save(cacheStatus);
//            statusCache.updateCacheData(num, cacheStatus);
        }
 
        this.afterSaveConnection(cacheStatus);
    }
 
    private TcpEquipStatus createStatus(Connection connection, String num){
        TcpEquip equip = this.getEquipmentFromCache(num);
        TcpEquipStatus entity = new TcpEquipStatus();
        entity.setCreateTime(NumberGenerator.getSequenceNumber());
        entity.setEquipNum(num);
        entity.setId(NumberGenerator.getLongSequenceNumber());
        entity.setLiveStatus(LiveStatus.CONST_CONNECTED);
        entity.setStartTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(connection.getCreateTimeMills())));
        if(equip == null){
            // 没有注册设备,但仍允许上报数据
            entity.setEquipId(0L);
            entity.setDept(0L);
        } else {
            entity.setEquipId(equip.getId());
            entity.setDept(equip.getDept());
        }
        return entity;
    }
 
    private TcpEquip getEquipmentFromCache(String num){
        return equipCache.getEquipment(num);
    }
 
    @Override
    protected void onDeleteConnection(int engineId, String name) throws Exception {
        if(engineId == EngineType.INDEX_TCP_WEBSOCKET && !this.websocketPersistent){
            // websocket连接不需要持久化
            return;
        }
        logger.debug("删除了一个连接:" + name);
        TcpEquipStatus cacheStatus = statusCache.getEquipmentStatus(name);
        cacheStatus.setEndTime(Long.parseLong(DateUtils.getDateTimeSecondForShow(System.currentTimeMillis())));
        cacheStatus.setLiveStatus(LiveStatus.CONST_NOT_CONNECT);
        this.tcpEquipStatusService.save(cacheStatus);
 
        this.afterDeleteConnection(cacheStatus);
    }
 
    @Override
    protected void onUpdateLastTime(int engineId, String name, long lastTime) throws Exception {
        logger.debug("更新了一个连接:" + name);
    }
 
    /**
     * 保存连接之后动作,由子类实现。目前为websocket推送做准备
     * @param cacheStatus
     */
    protected void afterSaveConnection(TcpEquipStatus cacheStatus){
//        OnlineResponse response = new OnlineResponse();
//        response.setName(cacheStatus.getEquipNum());
//        this.webSocketEngine.sendBroadcast(response);
    }
 
    /**
     * 设备断开连接之后动作,由子类实现。目前为websocket推送做准备
     * @param cacheStatus
     */
    protected void afterDeleteConnection(TcpEquipStatus cacheStatus){
//        OfflineResponse response = new OfflineResponse();
//        response.setName(cacheStatus.getEquipNum());
//        this.webSocketEngine.sendBroadcast(response);
    }
 
    public void setStatusCache(EquipmentStatusCacheProvider statusCache) {
        this.statusCache = statusCache;
    }
 
    public void setEquipCache(EquipmentCacheProvider equipCache) {
        this.equipCache = equipCache;
    }
 
    public void setTcpEquipStatusService(TcpEquipStatusServiceImpl tcpEquipStatusService) {
        this.tcpEquipStatusService = tcpEquipStatusService;
    }
 
    private EquipmentStatusCacheProvider statusCache;
    private EquipmentCacheProvider equipCache;
    private TcpEquipStatusServiceImpl tcpEquipStatusService;
}