package com.walker.tcp.websocket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.walker.tcp.Context; import com.walker.tcp.Filter; import com.walker.tcp.Request; import com.walker.tcp.Response; import com.walker.tcp.support.AbstractActionCall; import com.walker.tcp.support.SimpleContext; /** * 该对象虽然能够通过内部消息队列阻塞处理,但因在action内部,所以只能处理一种类型请求。 * 所以存在较大局限性,需要设计新的对象来统一处理消息队列任务。 * @author 时克英 * @date 2018-09-15 * */ @Deprecated public abstract class DefaultProcessAction extends AbstractActionCall{ private final List filterList = new ArrayList<>(); private int filterCount = 0; private final BlockingQueue> dataQueue = new ArrayBlockingQueue<>(1024); private ExecutorService service = Executors.newFixedThreadPool(1); private Runnable consumer = new DefaultConsumer(); public void init(){ logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName()); service.execute(consumer); } public void addFilter(Filter filter){ for(Filter f : filterList){ if(filter.getName().equalsIgnoreCase(f.getName())){ throw new IllegalArgumentException("已经存在该过滤器,不能重复添加"); } } this.filterList.add(filter); this.filterCount++; } @Override public Response action(Request request) { if(request == null){ throw new IllegalArgumentException("request is required!"); } if(request.isRequireResponse()){ return doProcessSync(request); } /* 把数据存入队列,排队处理 */ try { dataQueue.put(request); logger.debug("******** 加入一个数据到队列:" + request); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * 对于请求需要返回响应的数据,只能在同步方法action中处理 * @param request * @return */ protected abstract Response doProcessSync(Request request); /** * 对于不需要响应的请求,可以在异步线程中处理,通过消息队列来排队执行。 * @param context * @return */ protected abstract Response doProcessASync(Context context); private class DefaultConsumer implements Runnable{ @Override public void run() { logger.info("处理websocket线程启动:" + this.getClass().getName()); while(true){ try { Request request = dataQueue.take(); Context context = new SimpleContext(request, dataQueue.peek()); logger.debug("消费者拿到一个数据:" + request); if(filterCount > 0){ /* 存在过滤器,先过滤 */ boolean filterResult = true; for(Filter f : filterList){ filterResult = f.doFilter(context); if(!filterResult){ logger.debug("过滤器终止操作:" + f.getName()); break; } } if(!filterResult){ return; } } // 处理业务 doProcessASync(context); } catch (InterruptedException e) { logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e); } catch (Exception e) { logger.error("执行任务出现错误", e); } } } } }