package com.iplatform.tcp.lb; import com.iplatform.base.Constants; import com.iplatform.tcp.util.ws.LoginResponse; import com.iplatform.tcp.util.ws.WebDataResponse; import com.walker.infrastructure.utils.JsonUtils; import com.walker.push.rocketmq.EnhanceMessageHandler; import com.walker.push.rocketmq.tcp.MqResponse; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Response; import com.walker.tcp.lb.LongConnectionMeta; import org.apache.rocketmq.spring.core.RocketMQListener; import java.util.Map; /** * 平台默认的MQ队列监听器,业务需要根据情况继承该对象,实现自己的业务。 *

需要重写方法:{@linkplain DefaultMqListener#handleMessage(MqResponse)}

* @author 时克英 * @date 2023-09-26 * @date 2024-01-23 无需使用集群模式,单机即可,MQ不用开启监听。 */ //@RocketMQMessageListener(topic = "${iplatform.tcp.connection-host}", consumerGroup = "consumer-group") //public class DefaultMqListener extends EnhanceMessageHandler implements RocketMQListener { public class DefaultMqListener extends EnhanceMessageHandler implements RocketMQListener { // protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(MqResponse mqResponse) { this.dispatchMessage(mqResponse); } @Override protected void handleMessage(MqResponse message) throws Exception { // 具体执行业务消息方法。 // 这里聊天业务中,需要推送到本机连接的客户端(或浏览器) // Response response = message.getResponse(); Response response = this.translateResponse(message.getResponse()); Connection conn = connectionManager.getConnectionByName(response.getName()); if(conn == null || !conn.isConnected()){ logger.debug("mq消息已接收,但长连接不存在无法推送,response = {}", response); return; } if(conn instanceof LongConnectionMeta){ throw new IllegalStateException("这个应该是本地物理连接,但找到的是:LongConnectionMeta,name=" + response.getName()); } conn.write(response); } /** * 转换成实际发送的业务数据,业务可以继承该类,并重写方法。 * @param json * @return * @throws Exception */ protected Response translateResponse(String json) throws Exception{ Map responseMap = JsonUtils.jsonStringToObject(json, Map.class); if(!responseMap.containsKey(KEY_PROTOCOL_NUM)){ throw new IllegalArgumentException("responseMap中必须包含协议号字段:" + KEY_PROTOCOL_NUM); } String protocolNum = responseMap.get(KEY_PROTOCOL_NUM).toString(); if(protocolNum.equals("login")){ return JsonUtils.jsonStringToObject(json, LoginResponse.class); } else if(protocolNum.equals(Constants.PUSH_SCOPE_DATA)){ return JsonUtils.jsonStringToObject(json, WebDataResponse.class); } else { throw new UnsupportedOperationException("未实现的 websocket.response对象转换:" + json); } } private final String KEY_PROTOCOL_NUM = "protocolNum"; @Override protected void handleMaxRetriesExceeded(MqResponse message) { } @Override protected boolean isRetry() { return false; } @Override protected boolean throwException() { return true; } public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } private ConnectionManager connectionManager; }