package com.walker.tcp.support;
|
|
import com.walker.infrastructure.ApplicationRuntimeException;
|
import com.walker.infrastructure.core.ApplicationBeanInitialized;
|
import com.walker.queue.AbstractQueueManager;
|
import com.walker.queue.QueueException;
|
import com.walker.tcp.ActionCallException;
|
import com.walker.tcp.ActionCallable;
|
import com.walker.tcp.ActionCallablePostProcessor;
|
import com.walker.tcp.Connection;
|
import com.walker.tcp.ConnectionManager;
|
import com.walker.tcp.Context;
|
import com.walker.tcp.Filter;
|
import com.walker.tcp.Request;
|
import com.walker.tcp.Response;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* 使用内存队列实现的队列管理器。</p>
|
* 在数据量不大的情况下可以这样使用,例如:接入设备只有几十台,并发量只有上百的情况下。<br>
|
* 如果数据量较大,可以采用redis或者mq队列实现方式。{ RedisQueueManager}
|
* @author 时克英
|
* @date 2018-11-28
|
*
|
*/
|
public class MemoryQueueManager extends AbstractQueueManager implements ApplicationBeanInitialized {
|
|
private final BlockingQueue<Request<?>> dataQueue = new ArrayBlockingQueue<>(2048);
|
|
private final List<Filter> filterList = new ArrayList<>();
|
private int filterCount = 0;
|
|
private ExecutorService service = Executors.newFixedThreadPool(maxWorkerThread);
|
private Runnable consumer = new DefaultConsumer();
|
|
public void addFilter(Filter filter){
|
for(Filter f : filterList){
|
if(filter.getName().equalsIgnoreCase(f.getName())){
|
throw new IllegalArgumentException("已经存在该过滤器,不能重复添加");
|
}
|
}
|
this.filterList.add(filter);
|
this.filterCount++;
|
}
|
|
private class DefaultConsumer implements Runnable{
|
@Override
|
public void run() {
|
logger.info("处理websocket线程启动:" + this.getClass().getName());
|
while(true){
|
try {
|
Request<?> request = dataQueue.take();
|
Context context = new SimpleContext(request, dataQueue.peek());
|
logger.debug("消费者拿到一个数据:" + request);
|
|
if(filterCount > 0){
|
/* 存在过滤器,先过滤 */
|
boolean filterResult = true;
|
for(Filter f : filterList){
|
filterResult = f.doFilter(context);
|
if(!filterResult){
|
logger.debug("过滤器终止操作:" + f.getName());
|
break;
|
}
|
}
|
if(!filterResult){
|
return;
|
}
|
}
|
|
/**
|
// 处理业务(这样执行是单线程的)
|
// processAsync(context);
|
* */
|
|
// 使用ExecutorService的线程池来执行,而不是在这一个单线程中排队处理
|
service.execute(new ActionInvokeTask(context));
|
|
} catch (InterruptedException e) {
|
logger.error(DefaultConsumer.class.getName() + "消费者任务中断", e);
|
} catch (Exception e) {
|
logger.error("执行任务出现错误", e);
|
}
|
}
|
|
}
|
}
|
|
private class ActionInvokeTask implements Runnable{
|
|
private Context context;
|
|
public ActionInvokeTask(Context context){
|
this.context = context;
|
}
|
|
@Override
|
public void run() {
|
Request<?> request = context.getCurrentData();
|
ActionCallable action = ActionCallablePostProcessor.getAction(request.getProtocolNum());
|
if(action == null){
|
throw new ApplicationRuntimeException("action未定义,protocol = " + request.getProtocolNum());
|
}
|
|
try{
|
if(request.isRequireResponse()){
|
Response<?> response = action.action(request);
|
|
// 2018-09-20 时克英修改,响应对象中自动加入用户唯一编号
|
response.setName(request.getName());
|
|
Connection conn = connectionManager.getConnectionByName(request.getName());
|
if(conn == null || !conn.isConnected()){
|
processFailed(request);
|
return;
|
}
|
|
conn.write(response);
|
processSuccess(request);
|
|
} else {
|
action.action(request);
|
logger.debug("该请求不需要响应:" + request);
|
}
|
|
} catch(ActionCallException ex){
|
// 这里也可以加入提醒机制,待续
|
processFailed(request);
|
logger.error("执行action错误:" + ex.getMessage(), ex);
|
}
|
}
|
}
|
|
// /**
|
// * 业务Action执行方法。
|
// * @param context
|
// */
|
// protected void processAsync(Context context) throws Exception{}
|
|
@Override
|
public void push(String key, Object data, Object option) throws QueueException {
|
if(data == null){
|
throw new QueueException("request is required!");
|
}
|
if(!Request.class.isAssignableFrom(data.getClass())){
|
throw new QueueException("请求对象必须是: " + Request.class.getName());
|
}
|
|
try {
|
dataQueue.put((Request<?>)data);
|
logger.debug("加入一个数据到队列:" + data);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 消息处理失败,回调方法,子类可覆盖重写
|
* @param request
|
*/
|
protected void processFailed(Request<?> request){
|
logger.error("tcp消息处理失败,可能连接不存在或者已关闭,记录日志:" + request.toString());
|
}
|
|
/**
|
* 消息处理成功,回调方法,子类可覆盖重写
|
* @param request
|
*/
|
protected void processSuccess(Request<?> request){
|
logger.debug("tcp消息处理成功:" + request.getProtocolNum());
|
}
|
|
@Override
|
public void startup() {
|
if(this.connectionManager == null){
|
throw new IllegalArgumentException("connectionManager未配置");
|
}
|
logger.info("初始化TcpAction消费者任务:" + consumer.getClass().getName());
|
service.execute(consumer);
|
}
|
|
private ConnectionManager connectionManager;
|
|
public void setConnectionManager(ConnectionManager connectionManager) {
|
this.connectionManager = connectionManager;
|
}
|
|
}
|