package com.walker.push.rocketmq; import com.walker.infrastructure.ApplicationRuntimeException; import com.walker.infrastructure.core.ApplicationBeanInitialized; import com.walker.queue.AbstractQueueManager; import com.walker.queue.QueueException; import com.walker.tcp.ActionCallable; import com.walker.tcp.ActionCallablePostProcessor; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Request; import com.walker.tcp.Response; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 基于MQ消息队列的队列管理器。 *
 *     1) 目前其实没有用到MQ队列,因为当前仅用于聊天系统中,请求数据不需要添加到队列中。
 *     2) 只有业务调用WebSocketEngine推送时,才会走队列处理。
 * 
* @author 时克英 * @date 2023-09-26 */ public class RocketQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized { @Override public void startup() { } @Override public void push(String key, Object data, Object option) throws QueueException { if(data == null){ throw new QueueException("request is required!"); } if(!Request.class.isAssignableFrom(data.getClass())){ throw new QueueException("请求对象必须是: " + Request.class.getName()); } Request request = (Request) data; this.executor.execute(new AsyncProcessRequestTask(request)); } // @Override // public void push(String key, Object data, Object option) throws QueueException { // // key = request.getProtocolNum(); // if(data == null){ // throw new QueueException("队列数据data必须提供!"); // } // if(!MqRequest.class.isAssignableFrom(data.getClass())){ // throw new QueueException("请求对象必须是: " + MqRequest.class.getName()); // } // // MqRequest request = (MqRequest) data; // // try{ // // 2023-09-19 暂时同步发送,后续根据情况加入异步方式。 // this.rocketMQEnhanceTemplate.send(request.getTopic(), StringUtils.EMPTY_STRING, request); // logger.debug("加入一个数据到(MQ)队列:{}", data); // } catch (Exception ex){ // logger.error("{}", data); // logger.error("同步发送MQ消息异常:" + ex.getMessage(), ex); // } // } private class AsyncProcessRequestTask implements Runnable{ private Request request; public AsyncProcessRequestTask(Request request){ this.request = request; } @Override public void run() { ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum()); if(action == null){ throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum()); } try { if(!request.isRequireResponse()){ // 不需要响应,直接处理业务 action.action(request); return; } // 需要响应处理 // 2023-09-26,其实请求过程不需要MQ队列,仅仅在业务直接调用推送时需要使用。 Response response = action.action(request); // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号 response.setName(request.getName()); Connection conn = connectionManager.getConnectionByName(request.getName()); if(conn == null || !conn.isConnected()){ processFailed(request); return; } conn.write(response); processSuccess(request); } catch (Exception ex){ // 这里也可以加入提醒机制,待续 processFailed(request); logger.error("执行action(负载)错误:" + ex.getMessage(), ex); } } } /** * 消息处理失败,回调方法,子类可覆盖重写 * @param request */ protected void processFailed(Request request){ logger.error("(负载)tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString()); } /** * 消息处理成功,回调方法,子类可覆盖重写 * @param request */ protected void processSuccess(Request request){ logger.debug("(负载)tcp消息处理成功:" + request.getProtocolNum()); } public void setExecutor(ThreadPoolTaskExecutor executor) { this.executor = executor; } private ThreadPoolTaskExecutor executor = null; private ConnectionManager connectionManager; public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } // public void setRocketMQEnhanceTemplate(RocketMQEnhanceTemplate rocketMQEnhanceTemplate) { // this.rocketMQEnhanceTemplate = rocketMQEnhanceTemplate; // } // private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; }