package com.iplatform.tcp.lb;
import com.iplatform.base.Constants;
import com.iplatform.base.PlatformRuntimeException;
import com.iplatform.tcp.util.ws.LoginResponse;
import com.iplatform.tcp.util.ws.WebDataResponse;
import com.walker.infrastructure.utils.JsonUtils;
import com.walker.push.rocketmq.tcp.MqResponse;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Response;
import com.walker.tcp.lb.LongConnectionMeta;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* MQ消息监听接收处理。
*
* 1) consumerGroup:消费者组,这个是针对同一个主题和Tag。
* 资料介绍:
* A.一个消费者组,代表着一群topic相同,tag相同(即逻辑相同)的Consumer。通过一个消费者组,则可容易的进行负载均衡以及容错
* B.使用时,一个节点下,一个topic加一个tag可以对应一个consumer。一个消费者组就是横向上多个节点的相同consumer为一个消费组。
*
* @date 2023-10-07 修改消费者组,尝试解决多个队列消息无法接收问题。
* @date 2023-12-14 监听对象放在业务中处理,平台注释该对象,业务通过MQ接收其他节点发送的聊天消息。如果要测试,请打开注解。(标注废弃是为了引起注意!)
*/
//@RocketMQMessageListener(topic = "${iplatform.tcp.connection-host}", selectorExpression = "*", consumerGroup = "${iplatform.tcp.connection-host}-consumer-group")
@Deprecated
public class SimpleMqListener implements RocketMQListener {
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(String s) {
logger.debug("mq = {}", s);
// MqResponse mqResponse = null;
Map mqResponse = null;
try {
// mqResponse = JsonUtils.jsonStringToObject(s, MqResponse.class);
mqResponse = JsonUtils.jsonStringToObject(s, Map.class);
} catch (Exception e) {
throw new PlatformRuntimeException("mq消息转换json对象失败:" + e.getMessage(), e);
}
MqResponse message = MqResponseUtils.acquireMqResponse(mqResponse);
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message);
return;
}
Response> response = null;
try {
response = this.translateResponse(mqResponse.get("response").toString());
} catch (Exception e) {
throw new PlatformRuntimeException("转换成'Response'失败:" + e.getMessage(), e);
}
try {
long now = System.currentTimeMillis();
// handleMessage(message);
Connection conn = connectionManager.getConnectionByName(response.getName());
if(conn == null || !conn.isConnected()){
logger.debug("mq消息已接收,但长连接不存在无法推送,response = {}", response);
return;
}
if(conn instanceof LongConnectionMeta){
throw new IllegalStateException("这个应该是本地物理连接,但找到的是:LongConnectionMeta,name=" + response.getName());
}
conn.write(response);
long costTime = System.currentTimeMillis() - now;
logger.debug("消息{}消费成功,耗时[{}ms]", message.getKey(), costTime);
} catch (Exception e) {
logger.error("消息{}消费异常", message.getKey(),e);
// 是捕获异常还是抛出,由子类决定
if (throwException()) {
//抛出异常,由DefaultMessageListenerConcurrently类处理
throw new RuntimeException(e);
}
//此时如果不开启重试机制,则默认ACK了
// if (isRetry()) {
// handleRetry(message);
// }
}
}
/**
* 转换成实际发送的业务数据,业务可以继承该类,并重写方法。
* @param json
* @return
* @throws Exception
*/
protected Response> translateResponse(String json) throws Exception{
Map responseMap = JsonUtils.jsonStringToObject(json, Map.class);
if(!responseMap.containsKey(KEY_PROTOCOL_NUM)){
throw new IllegalArgumentException("responseMap中必须包含协议号字段:" + KEY_PROTOCOL_NUM);
}
String protocolNum = responseMap.get(KEY_PROTOCOL_NUM).toString();
if(protocolNum.equals("login")){
return JsonUtils.jsonStringToObject(json, LoginResponse.class);
} else if(protocolNum.equals(Constants.PUSH_SCOPE_DATA)){
return JsonUtils.jsonStringToObject(json, WebDataResponse.class);
} else {
throw new UnsupportedOperationException("未实现的 websocket.response对象转换:" + json);
}
}
protected void handleMaxRetriesExceeded(MqResponse message) {
}
protected boolean throwException() {
return true;
}
/**
* 最大重试次数
*
* @return 最大重试次数,默认5次
*/
protected int getMaxRetryTimes() {
return MAX_RETRY_TIMES;
}
/**
* 默认重试次数
*/
private static final int MAX_RETRY_TIMES = 3;
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
private ConnectionManager connectionManager;
private final String KEY_PROTOCOL_NUM = "protocolNum";
}