package com.walker.tcp.support;
import com.walker.infrastructure.ApplicationRuntimeException;
import com.walker.infrastructure.core.ApplicationBeanInitialized;
import com.walker.queue.AbstractQueueManager;
import com.walker.queue.QueueException;
import com.walker.tcp.ActionCallException;
import com.walker.tcp.ActionCallable;
import com.walker.tcp.ActionCallablePostProcessor;
import com.walker.tcp.Connection;
import com.walker.tcp.ConnectionManager;
import com.walker.tcp.Context;
import com.walker.tcp.Filter;
import com.walker.tcp.Request;
import com.walker.tcp.Response;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用内存队列实现的队列管理器。
* 在数据量不大的情况下可以这样使用,例如:接入设备只有几十台,并发量只有上百的情况下。
* 如果数据量较大,可以采用redis或者mq队列实现方式。{ RedisQueueManager}
* @author 时克英
* @date 2018-11-28
*
*/
public class MemoryQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
private final BlockingQueue> dataQueue = new ArrayBlockingQueue<>(2048);
private final List filterList = new ArrayList<>();
private int filterCount = 0;
private ExecutorService service = Executors.newFixedThreadPool(maxWorkerThread);
private Runnable consumer = new DefaultConsumer();
public void addFilter(Filter filter){
for(Filter f : filterList){
if(filter.getName().equalsIgnoreCase(f.getName())){
throw new IllegalArgumentException("已经存在该过滤器,不能重复添加");
}
}
this.filterList.add(filter);
this.filterCount++;
}
private class DefaultConsumer implements Runnable{
@Override
public void run() {
logger.info("处理websocket线程启动:" + this.getClass().getName());
while(true){
try {
Request> request = dataQueue.take();
Context context = new SimpleContext(request, dataQueue.peek());
logger.debug("消费者拿到一个数据:" + request);
if(filterCount > 0){
/* 存在过滤器,先过滤 */
boolean filterResult = true;
for(Filter f : filterList){
filterResult = f.doFilter(context);
if(!filterResult){
logger.debug("过滤器终止操作:" + f.getName());
break;
}
}
if(!filterResult){
return;
}
}
/**
// 处理业务(这样执行是单线程的)
// processAsync(context);
* */
// 使用ExecutorService的线程池来执行,而不是在这一个单线程中排队处理
service.execute(new ActionInvokeTask(context));
} catch (InterruptedException e) {
logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
} catch (Exception e) {
logger.error("执行任务出现错误", e);
}
}
}
}
private class ActionInvokeTask implements Runnable{
private Context context;
public ActionInvokeTask(Context context){
this.context = context;
}
@Override
public void run() {
Request> request = context.getCurrentData();
ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
if(action == null){
throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
}
try{
if(request.isRequireResponse()){
Response> response = action.action(request);
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
response.setName(request.getName());
Connection conn = connectionManager.getConnectionByName(request.getName());
if(conn == null || !conn.isConnected()){
processFailed(request);
return;
}
conn.write(response);
processSuccess(request);
} else {
action.action(request);
logger.debug("该请求不需要响应:" + request);
}
} catch(ActionCallException ex){
// 这里也可以加入提醒机制,待续
processFailed(request);
logger.error("执行action错误:" + ex.getMessage(), ex);
}
}
}
// /**
// * 业务Action执行方法。
// * @param context
// */
// protected void processAsync(Context context) throws Exception{}
@Override
public void push(String key, Object data, Object option) throws QueueException {
if(data == null){
throw new QueueException("request is required!");
}
if(!Request.class.isAssignableFrom(data.getClass())){
throw new QueueException("请求对象必须是: " + Request.class.getName());
}
try {
dataQueue.put((Request>)data);
logger.debug("加入一个数据到队列:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 消息处理失败,回调方法,子类可覆盖重写
* @param request
*/
protected void processFailed(Request> request){
logger.error("tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
}
/**
* 消息处理成功,回调方法,子类可覆盖重写
* @param request
*/
protected void processSuccess(Request> request){
logger.debug("tcp消息处理成功:" + request.getProtocolNum());
}
@Override
public void startup() {
if(this.connectionManager == null){
throw new IllegalArgumentException("connectionManager未配置");
}
logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName());
service.execute(consumer);
}
private ConnectionManager connectionManager;
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
}