package com.iplatform.tcp.lb;
import com.iplatform.base.Constants;
import com.iplatform.tcp.util.ws.LoginResponse;
import com.iplatform.tcp.util.ws.WebDataResponse;
import com.walker.infrastructure.utils.JsonUtils;
import com.walker.push.rocketmq.EnhanceMessageHandler;
import com.walker.push.rocketmq.tcp.MqResponse;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Response;
import com.walker.tcp.lb.LongConnectionMeta;
import org.apache.rocketmq.spring.core.RocketMQListener;
import java.util.Map;
/**
* 平台默认的MQ队列监听器,业务需要根据情况继承该对象,实现自己的业务。
*
需要重写方法:{@linkplain DefaultMqListener#handleMessage(MqResponse)}
* @author 时克英
* @date 2023-09-26
* @date 2024-01-23 无需使用集群模式,单机即可,MQ不用开启监听。
*/
//@RocketMQMessageListener(topic = "${iplatform.tcp.connection-host}", consumerGroup = "consumer-group")
//public class DefaultMqListener extends EnhanceMessageHandler implements RocketMQListener {
public class DefaultMqListener extends EnhanceMessageHandler implements RocketMQListener {
// protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(MqResponse mqResponse) {
this.dispatchMessage(mqResponse);
}
@Override
protected void handleMessage(MqResponse message) throws Exception {
// 具体执行业务消息方法。
// 这里聊天业务中,需要推送到本机连接的客户端(或浏览器)
// Response> response = message.getResponse();
Response> response = this.translateResponse(message.getResponse());
Connection conn = connectionManager.getConnectionByName(response.getName());
if(conn == null || !conn.isConnected()){
logger.debug("mq消息已接收,但长连接不存在无法推送,response = {}", response);
return;
}
if(conn instanceof LongConnectionMeta){
throw new IllegalStateException("这个应该是本地物理连接,但找到的是:LongConnectionMeta,name=" + response.getName());
}
conn.write(response);
}
/**
* 转换成实际发送的业务数据,业务可以继承该类,并重写方法。
* @param json
* @return
* @throws Exception
*/
protected Response> translateResponse(String json) throws Exception{
Map responseMap = JsonUtils.jsonStringToObject(json, Map.class);
if(!responseMap.containsKey(KEY_PROTOCOL_NUM)){
throw new IllegalArgumentException("responseMap中必须包含协议号字段:" + KEY_PROTOCOL_NUM);
}
String protocolNum = responseMap.get(KEY_PROTOCOL_NUM).toString();
if(protocolNum.equals("login")){
return JsonUtils.jsonStringToObject(json, LoginResponse.class);
} else if(protocolNum.equals(Constants.PUSH_SCOPE_DATA)){
return JsonUtils.jsonStringToObject(json, WebDataResponse.class);
} else {
throw new UnsupportedOperationException("未实现的 websocket.response对象转换:" + json);
}
}
private final String KEY_PROTOCOL_NUM = "protocolNum";
@Override
protected void handleMaxRetriesExceeded(MqResponse message) {
}
@Override
protected boolean isRetry() {
return false;
}
@Override
protected boolean throwException() {
return true;
}
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
private ConnectionManager connectionManager;
}