shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.walker.push.rocketmq;
 
import com.walker.infrastructure.ApplicationRuntimeException;
import com.walker.infrastructure.core.ApplicationBeanInitialized;
import com.walker.queue.AbstractQueueManager;
import com.walker.queue.QueueException;
import com.walker.tcp.ActionCallable;
import com.walker.tcp.ActionCallablePostProcessor;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Request;
import com.walker.tcp.Response;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
/**
 * 基于MQ消息队列的队列管理器。
 * <pre>
 *     1) 目前其实没有用到MQ队列,因为当前仅用于聊天系统中,请求数据不需要添加到队列中。
 *     2) 只有业务调用<code>WebSocketEngine</code>推送时,才会走队列处理。
 * </pre>
 * @author 时克英
 * @date 2023-09-26
 */
public class RocketQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
 
    @Override
    public void startup() {
 
    }
 
    @Override
    public void push(String key, Object data, Object option) throws QueueException {
        if(data == null){
            throw new QueueException("request is required!");
        }
        if(!Request.class.isAssignableFrom(data.getClass())){
            throw new QueueException("请求对象必须是: " + Request.class.getName());
        }
 
        Request<?> request = (Request<?>) data;
        this.executor.execute(new AsyncProcessRequestTask(request));
    }
//    @Override
//    public void push(String key, Object data, Object option) throws QueueException {
//        // key = request.getProtocolNum();
//        if(data == null){
//            throw new QueueException("队列数据data必须提供!");
//        }
//        if(!MqRequest.class.isAssignableFrom(data.getClass())){
//            throw new QueueException("请求对象必须是: " + MqRequest.class.getName());
//        }
//
//        MqRequest request = (MqRequest) data;
//
//        try{
//            // 2023-09-19 暂时同步发送,后续根据情况加入异步方式。
//            this.rocketMQEnhanceTemplate.send(request.getTopic(), StringUtils.EMPTY_STRING, request);
//            logger.debug("加入一个数据到(MQ)队列:{}", data);
//        } catch (Exception ex){
//            logger.error("{}", data);
//            logger.error("同步发送MQ消息异常:" + ex.getMessage(), ex);
//        }
//    }
 
    private class AsyncProcessRequestTask implements Runnable{
 
        private Request<?> request;
 
        public AsyncProcessRequestTask(Request<?> request){
            this.request = request;
        }
 
        @Override
        public void run() {
            ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
            if(action == null){
                throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
            }
 
            try {
                if(!request.isRequireResponse()){
                    // 不需要响应,直接处理业务
                    action.action(request);
                    return;
                }
 
                // 需要响应处理
                // 2023-09-26,其实请求过程不需要MQ队列,仅仅在业务直接调用推送时需要使用。
                Response<?> response = action.action(request);
                // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
                response.setName(request.getName());
 
                Connection conn = connectionManager.getConnectionByName(request.getName());
                if(conn == null || !conn.isConnected()){
                    processFailed(request);
                    return;
                }
 
                conn.write(response);
                processSuccess(request);
 
            } catch (Exception ex){
                // 这里也可以加入提醒机制,待续
                processFailed(request);
                logger.error("执行action(负载)错误:" + ex.getMessage(), ex);
            }
        }
    }
 
    /**
     * 消息处理失败,回调方法,子类可覆盖重写
     * @param request
     */
    protected void processFailed(Request<?> request){
        logger.error("(负载)tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
    }
 
    /**
     * 消息处理成功,回调方法,子类可覆盖重写
     * @param request
     */
    protected void processSuccess(Request<?> request){
        logger.debug("(负载)tcp消息处理成功:" + request.getProtocolNum());
    }
 
 
    public void setExecutor(ThreadPoolTaskExecutor executor) {
        this.executor = executor;
    }
 
    private ThreadPoolTaskExecutor executor = null;
 
    private ConnectionManager connectionManager;
 
    public void setConnectionManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }
//    public void setRocketMQEnhanceTemplate(RocketMQEnhanceTemplate rocketMQEnhanceTemplate) {
//        this.rocketMQEnhanceTemplate = rocketMQEnhanceTemplate;
//    }
//    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
}