package com.walker.tcp.support; import com.walker.infrastructure.ApplicationRuntimeException; import com.walker.infrastructure.core.ApplicationBeanInitialized; import com.walker.queue.AbstractQueueManager; import com.walker.queue.QueueException; import com.walker.tcp.ActionCallException; import com.walker.tcp.ActionCallable; import com.walker.tcp.ActionCallablePostProcessor; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.Context; import com.walker.tcp.Filter; import com.walker.tcp.Request; import com.walker.tcp.Response; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 使用内存队列实现的队列管理器。

* 在数据量不大的情况下可以这样使用,例如:接入设备只有几十台,并发量只有上百的情况下。
* 如果数据量较大,可以采用redis或者mq队列实现方式。{ RedisQueueManager} * @author 时克英 * @date 2018-11-28 * */ public class MemoryQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized { private final BlockingQueue> dataQueue = new ArrayBlockingQueue<>(2048); private final List filterList = new ArrayList<>(); private int filterCount = 0; private ExecutorService service = Executors.newFixedThreadPool(maxWorkerThread); private Runnable consumer = new DefaultConsumer(); public void addFilter(Filter filter){ for(Filter f : filterList){ if(filter.getName().equalsIgnoreCase(f.getName())){ throw new IllegalArgumentException("已经存在该过滤器,不能重复添加"); } } this.filterList.add(filter); this.filterCount++; } private class DefaultConsumer implements Runnable{ @Override public void run() { logger.info("处理websocket线程启动:" + this.getClass().getName()); while(true){ try { Request request = dataQueue.take(); Context context = new SimpleContext(request, dataQueue.peek()); logger.debug("消费者拿到一个数据:" + request); if(filterCount > 0){ /* 存在过滤器,先过滤 */ boolean filterResult = true; for(Filter f : filterList){ filterResult = f.doFilter(context); if(!filterResult){ logger.debug("过滤器终止操作:" + f.getName()); break; } } if(!filterResult){ return; } } /** // 处理业务(这样执行是单线程的) // processAsync(context); * */ // 使用ExecutorService的线程池来执行,而不是在这一个单线程中排队处理 service.execute(new ActionInvokeTask(context)); } catch (InterruptedException e) { logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e); } catch (Exception e) { logger.error("执行任务出现错误", e); } } } } private class ActionInvokeTask implements Runnable{ private Context context; public ActionInvokeTask(Context context){ this.context = context; } @Override public void run() { Request request = context.getCurrentData(); ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum()); if(action == null){ throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum()); } try{ if(request.isRequireResponse()){ Response response = action.action(request); // 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号 response.setName(request.getName()); Connection conn = connectionManager.getConnectionByName(request.getName()); if(conn == null || !conn.isConnected()){ processFailed(request); return; } conn.write(response); processSuccess(request); } else { action.action(request); logger.debug("该请求不需要响应:" + request); } } catch(ActionCallException ex){ // 这里也可以加入提醒机制,待续 processFailed(request); logger.error("执行action错误:" + ex.getMessage(), ex); } } } // /** // * 业务Action执行方法。 // * @param context // */ // protected void processAsync(Context context) throws Exception{} @Override public void push(String key, Object data, Object option) throws QueueException { if(data == null){ throw new QueueException("request is required!"); } if(!Request.class.isAssignableFrom(data.getClass())){ throw new QueueException("请求对象必须是: " + Request.class.getName()); } try { dataQueue.put((Request)data); logger.debug("加入一个数据到队列:" + data); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 消息处理失败,回调方法,子类可覆盖重写 * @param request */ protected void processFailed(Request request){ logger.error("tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString()); } /** * 消息处理成功,回调方法,子类可覆盖重写 * @param request */ protected void processSuccess(Request request){ logger.debug("tcp消息处理成功:" + request.getProtocolNum()); } @Override public void startup() { if(this.connectionManager == null){ throw new IllegalArgumentException("connectionManager未配置"); } logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName()); service.execute(consumer); } private ConnectionManager connectionManager; public void setConnectionManager(ConnectionManager connectionManager) { this.connectionManager = connectionManager; } }