package com.walker.tcp.websocket;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
import com.walker.tcp.Context;
|
import com.walker.tcp.Filter;
|
import com.walker.tcp.Request;
|
import com.walker.tcp.Response;
|
import com.walker.tcp.support.AbstractActionCall;
|
import com.walker.tcp.support.SimpleContext;
|
|
/**
|
* 该对象虽然能够通过内部消息队列阻塞处理,但因在action内部,所以只能处理一种类型请求。
|
* 所以存在较大局限性,需要设计新的对象来统一处理消息队列任务。
|
* @author 时克英
|
* @date 2018-09-15
|
*
|
*/
|
@Deprecated
|
public abstract class DefaultProcessAction extends AbstractActionCall{
|
|
private final List<Filter> filterList = new ArrayList<>();
|
private int filterCount = 0;
|
|
private final BlockingQueue<Request<?>> dataQueue = new ArrayBlockingQueue<>(1024);
|
|
private ExecutorService service = Executors.newFixedThreadPool(1);
|
private Runnable consumer = new DefaultConsumer();
|
|
public void init(){
|
logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName());
|
service.execute(consumer);
|
}
|
|
public void addFilter(Filter filter){
|
for(Filter f : filterList){
|
if(filter.getName().equalsIgnoreCase(f.getName())){
|
throw new IllegalArgumentException("已经存在该过滤器,不能重复添加");
|
}
|
}
|
this.filterList.add(filter);
|
this.filterCount++;
|
}
|
|
@Override
|
public Response<?> action(Request<?> request) {
|
if(request == null){
|
throw new IllegalArgumentException("request is required!");
|
}
|
|
if(request.isRequireResponse()){
|
return doProcessSync(request);
|
}
|
|
/* 把数据存入队列,排队处理 */
|
try {
|
dataQueue.put(request);
|
logger.debug("******** 加入一个数据到队列:" + request);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
return null;
|
}
|
|
/**
|
* 对于请求需要返回响应的数据,只能在同步方法<code>action</code>中处理
|
* @param request
|
* @return
|
*/
|
protected abstract Response<?> doProcessSync(Request<?> request);
|
|
/**
|
* 对于不需要响应的请求,可以在异步线程中处理,通过消息队列来排队执行。
|
* @param context
|
* @return
|
*/
|
protected abstract Response<?> doProcessASync(Context context);
|
|
private class DefaultConsumer implements Runnable{
|
@Override
|
public void run() {
|
logger.info("处理websocket线程启动:" + this.getClass().getName());
|
while(true){
|
try {
|
Request<?> request = dataQueue.take();
|
Context context = new SimpleContext(request, dataQueue.peek());
|
logger.debug("消费者拿到一个数据:" + request);
|
|
if(filterCount > 0){
|
/* 存在过滤器,先过滤 */
|
boolean filterResult = true;
|
for(Filter f : filterList){
|
filterResult = f.doFilter(context);
|
if(!filterResult){
|
logger.debug("过滤器终止操作:" + f.getName());
|
break;
|
}
|
}
|
if(!filterResult){
|
return;
|
}
|
}
|
|
// 处理业务
|
doProcessASync(context);
|
|
} catch (InterruptedException e) {
|
logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
|
} catch (Exception e) {
|
logger.error("执行任务出现错误", e);
|
}
|
}
|
|
}
|
}
|
|
}
|