package com.iplatform.tcp.lb;
|
|
import com.iplatform.base.PlatformRuntimeException;
|
import com.iplatform.core.BeanContextAware;
|
import com.walker.infrastructure.utils.DateUtils;
|
import com.walker.infrastructure.utils.JsonUtils;
|
import com.walker.push.rocketmq.RocketMQEnhanceTemplate;
|
import com.walker.push.rocketmq.tcp.MqResponse;
|
import com.walker.tcp.Response;
|
import com.walker.tcp.lb.ResponseWriter;
|
import com.walker.web.util.IdUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
public class DefaultResponseWriter implements ResponseWriter {
|
|
private final transient SendStatusCallback callback = new SendStatusCallback();
|
|
protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
@Override
|
public Object write(String type, Response<?> response, String connectionHost, String channelId) {
|
if(type.equals(TYPE_MQ)){
|
logger.debug("+++++++++++++++++++++++++");
|
BeanContextAware.getBeanByType(RocketMQEnhanceTemplate.class)
|
.sendAsync(connectionHost, this.acquireMqResponse(response), callback);
|
if(this.logger.isDebugEnabled()){
|
logger.debug("MqConnectionMeta 发送一个消息:{}", response);
|
}
|
} else {
|
throw new UnsupportedOperationException("不支持的 ResponseWriter.type = " + type);
|
}
|
return null;
|
}
|
|
private MqResponse acquireMqResponse(Response<?> response){
|
MqResponse data = new MqResponse();
|
// data.setResponse(response);
|
try {
|
data.setResponse(JsonUtils.objectToJsonString(response));
|
} catch (Exception e) {
|
throw new PlatformRuntimeException("Response<?> --> String 失败:" + e.getMessage(), e);
|
}
|
data.setKey(IdUtils.fastSimpleUUID());
|
data.setRetryTimes(1);
|
data.setSource(response.getProtocolNum());
|
data.setSendTime(DateUtils.getDateTimeNumber());
|
data.setTopic(response.getTopic());
|
return data;
|
}
|
}
|