shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package com.walker.tcp.support;
 
import com.walker.infrastructure.ApplicationRuntimeException;
import com.walker.infrastructure.core.ApplicationBeanInitialized;
import com.walker.queue.AbstractQueueManager;
import com.walker.queue.QueueException;
import com.walker.tcp.ActionCallException;
import com.walker.tcp.ActionCallable;
import com.walker.tcp.ActionCallablePostProcessor;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Context;
import com.walker.tcp.Filter;
import com.walker.tcp.Request;
import com.walker.tcp.Response;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * 使用内存队列实现的队列管理器。</p>
 * 在数据量不大的情况下可以这样使用,例如:接入设备只有几十台,并发量只有上百的情况下。<br>
 * 如果数据量较大,可以采用redis或者mq队列实现方式。{ RedisQueueManager}
 * @author 时克英
 * @date 2018-11-28
 *
 */
public class MemoryQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
 
    private final BlockingQueue<Request<?>> dataQueue = new ArrayBlockingQueue<>(2048);
 
    private final List<Filter> filterList = new ArrayList<>();
    private int filterCount = 0;
 
    private ExecutorService service = Executors.newFixedThreadPool(maxWorkerThread);
    private Runnable consumer = new DefaultConsumer();
 
    public void addFilter(Filter filter){
        for(Filter f : filterList){
            if(filter.getName().equalsIgnoreCase(f.getName())){
                throw new IllegalArgumentException("已经存在该过滤器,不能重复添加");
            }
        }
        this.filterList.add(filter);
        this.filterCount++;
    }
 
    private class DefaultConsumer implements Runnable{
        @Override
        public void run() {
            logger.info("处理websocket线程启动:" + this.getClass().getName());
            while(true){
                try {
                    Request<?> request = dataQueue.take();
                    Context context = new SimpleContext(request, dataQueue.peek());
                    logger.debug("消费者拿到一个数据:" + request);
 
                    if(filterCount > 0){
                        /* 存在过滤器,先过滤 */
                        boolean filterResult = true;
                        for(Filter f : filterList){
                            filterResult = f.doFilter(context);
                            if(!filterResult){
                                logger.debug("过滤器终止操作:" + f.getName());
                                break;
                            }
                        }
                        if(!filterResult){
                            return;
                        }
                    }
 
                    /**
                    // 处理业务(这样执行是单线程的)
//                    processAsync(context);
 * */
 
                    // 使用ExecutorService的线程池来执行,而不是在这一个单线程中排队处理
                    service.execute(new ActionInvokeTask(context));
 
                } catch (InterruptedException e) {
                    logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
                } catch (Exception e) {
                    logger.error("执行任务出现错误", e);
                }
            }
 
        }
    }
 
    private class ActionInvokeTask implements Runnable{
 
        private Context context;
 
        public ActionInvokeTask(Context context){
            this.context = context;
        }
 
        @Override
        public void run() {
            Request<?> request = context.getCurrentData();
            ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
            if(action == null){
                throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
            }
 
            try{
                if(request.isRequireResponse()){
                    Response<?> response = action.action(request);
 
                    // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
                    response.setName(request.getName());
 
                    Connection conn = connectionManager.getConnectionByName(request.getName());
                    if(conn == null || !conn.isConnected()){
                        processFailed(request);
                        return;
                    }
 
                    conn.write(response);
                    processSuccess(request);
 
                } else {
                    action.action(request);
                    logger.debug("该请求不需要响应:" + request);
                }
 
            } catch(ActionCallException ex){
                // 这里也可以加入提醒机制,待续
                processFailed(request);
                logger.error("执行action错误:" + ex.getMessage(), ex);
            }
        }
    }
 
//    /**
//     * 业务Action执行方法。
//     * @param context
//     */
//    protected void processAsync(Context context) throws Exception{}
 
    @Override
    public void push(String key, Object data, Object option) throws QueueException {
        if(data == null){
            throw new QueueException("request is required!");
        }
        if(!Request.class.isAssignableFrom(data.getClass())){
            throw new QueueException("请求对象必须是: " + Request.class.getName());
        }
 
        try {
            dataQueue.put((Request<?>)data);
            logger.debug("加入一个数据到队列:" + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 消息处理失败,回调方法,子类可覆盖重写
     * @param request
     */
    protected void processFailed(Request<?> request){
        logger.error("tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
    }
 
    /**
     * 消息处理成功,回调方法,子类可覆盖重写
     * @param request
     */
    protected void processSuccess(Request<?> request){
        logger.debug("tcp消息处理成功:" + request.getProtocolNum());
    }
 
    @Override
    public void startup() {
        if(this.connectionManager == null){
            throw new IllegalArgumentException("connectionManager未配置");
        }
        logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName());
        service.execute(consumer);
    }
 
    private ConnectionManager connectionManager;
 
    public void setConnectionManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }
 
}