shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.walker.tcp.websocket;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import com.walker.tcp.Context;
import com.walker.tcp.Filter;
import com.walker.tcp.Request;
import com.walker.tcp.Response;
import com.walker.tcp.support.AbstractActionCall;
import com.walker.tcp.support.SimpleContext;
 
/**
 * 该对象虽然能够通过内部消息队列阻塞处理,但因在action内部,所以只能处理一种类型请求。
 * 所以存在较大局限性,需要设计新的对象来统一处理消息队列任务。
 * @author 时克英
 * @date 2018-09-15
 *
 */
@Deprecated
public abstract class DefaultProcessAction extends AbstractActionCall{
 
    private final List<Filter> filterList = new ArrayList<>();
    private int filterCount = 0;
    
    private final BlockingQueue<Request<?>> dataQueue = new ArrayBlockingQueue<>(1024);
    
    private ExecutorService service = Executors.newFixedThreadPool(1);
    private Runnable consumer = new DefaultConsumer();
    
    public void init(){
        logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName());
        service.execute(consumer);
    }
    
    public void addFilter(Filter filter){
        for(Filter f : filterList){
            if(filter.getName().equalsIgnoreCase(f.getName())){
                throw new IllegalArgumentException("已经存在该过滤器,不能重复添加");
            }
        }
        this.filterList.add(filter);
        this.filterCount++;
    }
    
    @Override
    public Response<?> action(Request<?> request) {
        if(request == null){
            throw new IllegalArgumentException("request is required!");
        }
        
        if(request.isRequireResponse()){
            return doProcessSync(request);
        }
        
        /* 把数据存入队列,排队处理 */
        try {
            dataQueue.put(request);
            logger.debug("******** 加入一个数据到队列:" + request);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    
    /**
     * 对于请求需要返回响应的数据,只能在同步方法<code>action</code>中处理
     * @param request
     * @return
     */
    protected abstract Response<?> doProcessSync(Request<?> request); 
    
    /**
     * 对于不需要响应的请求,可以在异步线程中处理,通过消息队列来排队执行。
     * @param context
     * @return
     */
    protected abstract Response<?> doProcessASync(Context context); 
    
    private class DefaultConsumer implements Runnable{
        @Override
        public void run() {
            logger.info("处理websocket线程启动:" + this.getClass().getName());
            while(true){
                try {
                    Request<?> request = dataQueue.take();
                    Context context = new SimpleContext(request, dataQueue.peek());
                    logger.debug("消费者拿到一个数据:" + request);
                    
                    if(filterCount > 0){
                        /* 存在过滤器,先过滤 */
                        boolean filterResult = true;
                        for(Filter f : filterList){
                            filterResult = f.doFilter(context);
                            if(!filterResult){
                                logger.debug("过滤器终止操作:" + f.getName());
                                break;
                            }
                        }
                        if(!filterResult){
                            return;
                        }
                    }
                    
                    // 处理业务
                    doProcessASync(context);
                    
                } catch (InterruptedException e) {
                    logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
                } catch (Exception e) {
                    logger.error("执行任务出现错误", e);
                }
            }
            
        }
    }
 
}