WangHan
2025-04-02 a8ba678a3fe5a39da2c732014cebbb66e408e97c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.iplatform.tcp.lb;
 
import com.iplatform.base.Constants;
import com.iplatform.base.PlatformRuntimeException;
import com.iplatform.tcp.util.ws.LoginResponse;
import com.iplatform.tcp.util.ws.WebDataResponse;
import com.walker.infrastructure.utils.JsonUtils;
import com.walker.push.rocketmq.tcp.MqResponse;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Response;
import com.walker.tcp.lb.LongConnectionMeta;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Map;
 
/**
 * MQ消息监听接收处理。
 * <pre>
 *     1) consumerGroup:消费者组,这个是针对同一个主题和Tag。
 *     资料介绍:
 *     A.一个消费者组,代表着一群topic相同,tag相同(即逻辑相同)的Consumer。通过一个消费者组,则可容易的进行负载均衡以及容错
 *     B.使用时,一个节点下,一个topic加一个tag可以对应一个consumer。一个消费者组就是横向上多个节点的相同consumer为一个消费组。
 * </pre>
 * @date 2023-10-07 修改消费者组,尝试解决多个队列消息无法接收问题。
 * @date 2023-12-14 监听对象放在业务中处理,平台注释该对象,业务通过MQ接收其他节点发送的聊天消息。如果要测试,请打开注解。(标注废弃是为了引起注意!)
 */
//@RocketMQMessageListener(topic = "${iplatform.tcp.connection-host}", selectorExpression = "*", consumerGroup = "${iplatform.tcp.connection-host}-consumer-group")
@Deprecated
public class SimpleMqListener implements RocketMQListener<String> {
 
    protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Override
    public void onMessage(String s) {
        logger.debug("mq = {}", s);
//        MqResponse mqResponse = null;
        Map<String, Object> mqResponse = null;
        try {
//            mqResponse = JsonUtils.jsonStringToObject(s, MqResponse.class);
            mqResponse = JsonUtils.jsonStringToObject(s, Map.class);
        } catch (Exception e) {
            throw new PlatformRuntimeException("mq消息转换json对象失败:" + e.getMessage(), e);
        }
 
        MqResponse message = MqResponseUtils.acquireMqResponse(mqResponse);
 
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
 
        Response<?> response = null;
        try {
            response = this.translateResponse(mqResponse.get("response").toString());
        } catch (Exception e) {
            throw new PlatformRuntimeException("转换成'Response'失败:" + e.getMessage(), e);
        }
 
        try {
            long now = System.currentTimeMillis();
//            handleMessage(message);
            Connection conn = connectionManager.getConnectionByName(response.getName());
            if(conn == null || !conn.isConnected()){
                logger.debug("mq消息已接收,但长连接不存在无法推送,response = {}", response);
                return;
            }
 
            if(conn instanceof LongConnectionMeta){
                throw new IllegalStateException("这个应该是本地物理连接,但找到的是:LongConnectionMeta,name=" + response.getName());
            }
            conn.write(response);
 
            long costTime = System.currentTimeMillis() - now;
            logger.debug("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
 
        } catch (Exception e) {
            logger.error("消息{}消费异常", message.getKey(),e);
            // 是捕获异常还是抛出,由子类决定
            if (throwException()) {
                //抛出异常,由DefaultMessageListenerConcurrently类处理
                throw new RuntimeException(e);
            }
            //此时如果不开启重试机制,则默认ACK了
//            if (isRetry()) {
//                handleRetry(message);
//            }
        }
    }
 
    /**
     * 转换成实际发送的业务数据,业务可以继承该类,并重写方法。
     * @param json
     * @return
     * @throws Exception
     */
    protected Response<?> translateResponse(String json) throws Exception{
        Map<String, Object> responseMap = JsonUtils.jsonStringToObject(json, Map.class);
        if(!responseMap.containsKey(KEY_PROTOCOL_NUM)){
            throw new IllegalArgumentException("responseMap中必须包含协议号字段:" + KEY_PROTOCOL_NUM);
        }
        String protocolNum = responseMap.get(KEY_PROTOCOL_NUM).toString();
        if(protocolNum.equals("login")){
            return JsonUtils.jsonStringToObject(json, LoginResponse.class);
        } else if(protocolNum.equals(Constants.PUSH_SCOPE_DATA)){
            return JsonUtils.jsonStringToObject(json, WebDataResponse.class);
        } else {
            throw new UnsupportedOperationException("未实现的 websocket.response对象转换:" + json);
        }
    }
 
    protected void handleMaxRetriesExceeded(MqResponse message) {
 
    }
 
    protected boolean throwException() {
        return true;
    }
 
    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默认5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }
 
    /**
     * 默认重试次数
     */
    private static final int MAX_RETRY_TIMES = 3;
 
    public void setConnectionManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }
 
    private ConnectionManager connectionManager;
    private final String KEY_PROTOCOL_NUM = "protocolNum";
}