package com.walker.push.rocketmq; import com.walker.infrastructure.ApplicationRuntimeException; import com.walker.infrastructure.core.ApplicationBeanInitialized; import com.walker.queue.AbstractQueueManager; import com.walker.queue.QueueException; import com.walker.tcp.ActionCallable; import com.walker.tcp.ActionCallablePostProcessor; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Request; import com.walker.tcp.Response; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 基于MQ消息队列的队列管理器。 *
* 1) 目前其实没有用到MQ队列,因为当前仅用于聊天系统中,请求数据不需要添加到队列中。
* 2) 只有业务调用WebSocketEngine
推送时,才会走队列处理。
*
* @author 时克英
* @date 2023-09-26
*/
public class RocketQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
@Override
public void startup() {
}
@Override
public void push(String key, Object data, Object option) throws QueueException {
if(data == null){
throw new QueueException("request is required!");
}
if(!Request.class.isAssignableFrom(data.getClass())){
throw new QueueException("请求对象必须是: " + Request.class.getName());
}
Request> request = (Request>) data;
this.executor.execute(new AsyncProcessRequestTask(request));
}
// @Override
// public void push(String key, Object data, Object option) throws QueueException {
// // key = request.getProtocolNum();
// if(data == null){
// throw new QueueException("队列数据data必须提供!");
// }
// if(!MqRequest.class.isAssignableFrom(data.getClass())){
// throw new QueueException("请求对象必须是: " + MqRequest.class.getName());
// }
//
// MqRequest request = (MqRequest) data;
//
// try{
// // 2023-09-19 暂时同步发送,后续根据情况加入异步方式。
// this.rocketMQEnhanceTemplate.send(request.getTopic(), StringUtils.EMPTY_STRING, request);
// logger.debug("加入一个数据到(MQ)队列:{}", data);
// } catch (Exception ex){
// logger.error("{}", data);
// logger.error("同步发送MQ消息异常:" + ex.getMessage(), ex);
// }
// }
private class AsyncProcessRequestTask implements Runnable{
private Request> request;
public AsyncProcessRequestTask(Request> request){
this.request = request;
}
@Override
public void run() {
ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
if(action == null){
throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
}
try {
if(!request.isRequireResponse()){
// 不需要响应,直接处理业务
action.action(request);
return;
}
// 需要响应处理
// 2023-09-26,其实请求过程不需要MQ队列,仅仅在业务直接调用推送时需要使用。
Response> response = action.action(request);
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
response.setName(request.getName());
Connection conn = connectionManager.getConnectionByName(request.getName());
if(conn == null || !conn.isConnected()){
processFailed(request);
return;
}
conn.write(response);
processSuccess(request);
} catch (Exception ex){
// 这里也可以加入提醒机制,待续
processFailed(request);
logger.error("执行action(负载)错误:" + ex.getMessage(), ex);
}
}
}
/**
* 消息处理失败,回调方法,子类可覆盖重写
* @param request
*/
protected void processFailed(Request> request){
logger.error("(负载)tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
}
/**
* 消息处理成功,回调方法,子类可覆盖重写
* @param request
*/
protected void processSuccess(Request> request){
logger.debug("(负载)tcp消息处理成功:" + request.getProtocolNum());
}
public void setExecutor(ThreadPoolTaskExecutor executor) {
this.executor = executor;
}
private ThreadPoolTaskExecutor executor = null;
private ConnectionManager connectionManager;
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
// public void setRocketMQEnhanceTemplate(RocketMQEnhanceTemplate rocketMQEnhanceTemplate) {
// this.rocketMQEnhanceTemplate = rocketMQEnhanceTemplate;
// }
// private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
}