package com.iplatform.tcp.lb; import com.iplatform.base.Constants; import com.iplatform.base.PlatformRuntimeException; import com.iplatform.tcp.util.ws.LoginResponse; import com.iplatform.tcp.util.ws.WebDataResponse; import com.walker.infrastructure.utils.JsonUtils; import com.walker.push.rocketmq.tcp.MqResponse; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Response; import com.walker.tcp.lb.LongConnectionMeta; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; /** * MQ消息监听接收处理。 *
 *     1) consumerGroup:消费者组,这个是针对同一个主题和Tag。
 *     资料介绍:
 *     A.一个消费者组,代表着一群topic相同,tag相同(即逻辑相同)的Consumer。通过一个消费者组,则可容易的进行负载均衡以及容错
 *     B.使用时,一个节点下,一个topic加一个tag可以对应一个consumer。一个消费者组就是横向上多个节点的相同consumer为一个消费组。
 * 
* @date 2023-10-07 修改消费者组,尝试解决多个队列消息无法接收问题。 * @date 2023-12-14 监听对象放在业务中处理,平台注释该对象,业务通过MQ接收其他节点发送的聊天消息。如果要测试,请打开注解。(标注废弃是为了引起注意!) */ //@RocketMQMessageListener(topic = "${iplatform.tcp.connection-host}", selectorExpression = "*", consumerGroup = "${iplatform.tcp.connection-host}-consumer-group") @Deprecated public class SimpleMqListener implements RocketMQListener { protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(String s) { logger.debug("mq = {}", s); // MqResponse mqResponse = null; Map mqResponse = null; try { // mqResponse = JsonUtils.jsonStringToObject(s, MqResponse.class); mqResponse = JsonUtils.jsonStringToObject(s, Map.class); } catch (Exception e) { throw new PlatformRuntimeException("mq消息转换json对象失败:" + e.getMessage(), e); } MqResponse message = MqResponseUtils.acquireMqResponse(mqResponse); // 超过最大重试次数时调用子类方法处理 if (message.getRetryTimes() > getMaxRetryTimes()) { handleMaxRetriesExceeded(message); return; } Response response = null; try { response = this.translateResponse(mqResponse.get("response").toString()); } catch (Exception e) { throw new PlatformRuntimeException("转换成'Response'失败:" + e.getMessage(), e); } try { long now = System.currentTimeMillis(); // handleMessage(message); Connection conn = connectionManager.getConnectionByName(response.getName()); if(conn == null || !conn.isConnected()){ logger.debug("mq消息已接收,但长连接不存在无法推送,response = {}", response); return; } if(conn instanceof LongConnectionMeta){ throw new IllegalStateException("这个应该是本地物理连接,但找到的是:LongConnectionMeta,name=" + response.getName()); } conn.write(response); long costTime = System.currentTimeMillis() - now; logger.debug("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime); } catch (Exception e) { logger.error("消息{}消费异常", message.getKey(),e); // 是捕获异常还是抛出,由子类决定 if (throwException()) { //抛出异常,由DefaultMessageListenerConcurrently类处理 throw new RuntimeException(e); } //此时如果不开启重试机制,则默认ACK了 // if (isRetry()) { // handleRetry(message); // } } } /** * 转换成实际发送的业务数据,业务可以继承该类,并重写方法。 * @param json * @return * @throws Exception */ protected Response translateResponse(String json) throws Exception{ Map responseMap = JsonUtils.jsonStringToObject(json, Map.class); if(!responseMap.containsKey(KEY_PROTOCOL_NUM)){ throw new IllegalArgumentException("responseMap中必须包含协议号字段:" + KEY_PROTOCOL_NUM); } String protocolNum = responseMap.get(KEY_PROTOCOL_NUM).toString(); if(protocolNum.equals("login")){ return JsonUtils.jsonStringToObject(json, LoginResponse.class); } else if(protocolNum.equals(Constants.PUSH_SCOPE_DATA)){ return JsonUtils.jsonStringToObject(json, WebDataResponse.class); } else { throw new UnsupportedOperationException("未实现的 websocket.response对象转换:" + json); } } protected void handleMaxRetriesExceeded(MqResponse message) { } protected boolean throwException() { return true; } /** * 最大重试次数 * * @return 最大重试次数,默认5次 */ protected int getMaxRetryTimes() { return MAX_RETRY_TIMES; } /** * 默认重试次数 */ private static final int MAX_RETRY_TIMES = 3; public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } private ConnectionManager connectionManager; private final String KEY_PROTOCOL_NUM = "protocolNum"; }