package com.iplatform.tcp.lb; import com.iplatform.base.PlatformRuntimeException; import com.iplatform.core.BeanContextAware; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.JsonUtils; import com.walker.push.rocketmq.RocketMQEnhanceTemplate; import com.walker.push.rocketmq.tcp.MqResponse; import com.walker.tcp.Response; import com.walker.tcp.lb.ResponseWriter; import com.walker.web.util.IdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DefaultResponseWriter implements ResponseWriter { private final transient SendStatusCallback callback = new SendStatusCallback(); protected final transient Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public Object write(String type, Response response, String connectionHost, String channelId) { if(type.equals(TYPE_MQ)){ logger.debug("+++++++++++++++++++++++++"); BeanContextAware.getBeanByType(RocketMQEnhanceTemplate.class) .sendAsync(connectionHost, this.acquireMqResponse(response), callback); if(this.logger.isDebugEnabled()){ logger.debug("MqConnectionMeta 发送一个消息:{}", response); } } else { throw new UnsupportedOperationException("不支持的 ResponseWriter.type = " + type); } return null; } private MqResponse acquireMqResponse(Response response){ MqResponse data = new MqResponse(); // data.setResponse(response); try { data.setResponse(JsonUtils.objectToJsonString(response)); } catch (Exception e) { throw new PlatformRuntimeException("Response --> String 失败:" + e.getMessage(), e); } data.setKey(IdUtils.fastSimpleUUID()); data.setRetryTimes(1); data.setSource(response.getProtocolNum()); data.setSendTime(DateUtils.getDateTimeNumber()); data.setTopic(response.getTopic()); return data; } }