package com.walker.push.rocketmq;
|
|
import com.walker.infrastructure.ApplicationRuntimeException;
|
import com.walker.infrastructure.core.ApplicationBeanInitialized;
|
import com.walker.queue.AbstractQueueManager;
|
import com.walker.queue.QueueException;
|
import com.walker.tcp.ActionCallable;
|
import com.walker.tcp.ActionCallablePostProcessor;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.Request;
|
import com.walker.tcp.Response;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
/**
|
* 基于MQ消息队列的队列管理器。
|
* <pre>
|
* 1) 目前其实没有用到MQ队列,因为当前仅用于聊天系统中,请求数据不需要添加到队列中。
|
* 2) 只有业务调用<code>WebSocketEngine</code>推送时,才会走队列处理。
|
* </pre>
|
* @author 时克英
|
* @date 2023-09-26
|
*/
|
public class RocketQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
|
|
@Override
|
public void startup() {
|
|
}
|
|
@Override
|
public void push(String key, Object data, Object option) throws QueueException {
|
if(data == null){
|
throw new QueueException("request is required!");
|
}
|
if(!Request.class.isAssignableFrom(data.getClass())){
|
throw new QueueException("请求对象必须是: " + Request.class.getName());
|
}
|
|
Request<?> request = (Request<?>) data;
|
this.executor.execute(new AsyncProcessRequestTask(request));
|
}
|
// @Override
|
// public void push(String key, Object data, Object option) throws QueueException {
|
// // key = request.getProtocolNum();
|
// if(data == null){
|
// throw new QueueException("队列数据data必须提供!");
|
// }
|
// if(!MqRequest.class.isAssignableFrom(data.getClass())){
|
// throw new QueueException("请求对象必须是: " + MqRequest.class.getName());
|
// }
|
//
|
// MqRequest request = (MqRequest) data;
|
//
|
// try{
|
// // 2023-09-19 暂时同步发送,后续根据情况加入异步方式。
|
// this.rocketMQEnhanceTemplate.send(request.getTopic(), StringUtils.EMPTY_STRING, request);
|
// logger.debug("加入一个数据到(MQ)队列:{}", data);
|
// } catch (Exception ex){
|
// logger.error("{}", data);
|
// logger.error("同步发送MQ消息异常:" + ex.getMessage(), ex);
|
// }
|
// }
|
|
private class AsyncProcessRequestTask implements Runnable{
|
|
private Request<?> request;
|
|
public AsyncProcessRequestTask(Request<?> request){
|
this.request = request;
|
}
|
|
@Override
|
public void run() {
|
ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
|
if(action == null){
|
throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
|
}
|
|
try {
|
if(!request.isRequireResponse()){
|
// 不需要响应,直接处理业务
|
action.action(request);
|
return;
|
}
|
|
// 需要响应处理
|
// 2023-09-26,其实请求过程不需要MQ队列,仅仅在业务直接调用推送时需要使用。
|
Response<?> response = action.action(request);
|
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
|
response.setName(request.getName());
|
|
Connection conn = connectionManager.getConnectionByName(request.getName());
|
if(conn == null || !conn.isConnected()){
|
processFailed(request);
|
return;
|
}
|
|
conn.write(response);
|
processSuccess(request);
|
|
} catch (Exception ex){
|
// 这里也可以加入提醒机制,待续
|
processFailed(request);
|
logger.error("执行action(负载)错误:" + ex.getMessage(), ex);
|
}
|
}
|
}
|
|
/**
|
* 消息处理失败,回调方法,子类可覆盖重写
|
* @param request
|
*/
|
protected void processFailed(Request<?> request){
|
logger.error("(负载)tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
|
}
|
|
/**
|
* 消息处理成功,回调方法,子类可覆盖重写
|
* @param request
|
*/
|
protected void processSuccess(Request<?> request){
|
logger.debug("(负载)tcp消息处理成功:" + request.getProtocolNum());
|
}
|
|
|
public void setExecutor(ThreadPoolTaskExecutor executor) {
|
this.executor = executor;
|
}
|
|
private ThreadPoolTaskExecutor executor = null;
|
|
private ConnectionManager connectionManager;
|
|
public void setConnectionManager(ConnectionManager connectionManager) {
|
this.connectionManager = connectionManager;
|
}
|
// public void setRocketMQEnhanceTemplate(RocketMQEnhanceTemplate rocketMQEnhanceTemplate) {
|
// this.rocketMQEnhanceTemplate = rocketMQEnhanceTemplate;
|
// }
|
// private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
|
}
|