shikeyin
2024-01-11 65da8373531677b1c37a98f53eaa30c892f35e5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.iplatform.tcp.lb;
 
import com.iplatform.base.PlatformRuntimeException;
import com.iplatform.core.BeanContextAware;
import com.walker.infrastructure.utils.DateUtils;
import com.walker.infrastructure.utils.JsonUtils;
import com.walker.push.rocketmq.RocketMQEnhanceTemplate;
import com.walker.push.rocketmq.tcp.MqResponse;
import com.walker.tcp.Response;
import com.walker.tcp.lb.ResponseWriter;
import com.walker.web.util.IdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class DefaultResponseWriter implements ResponseWriter {
 
    private final transient SendStatusCallback callback = new SendStatusCallback();
 
    protected final transient Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Override
    public Object write(String type, Response<?> response, String connectionHost, String channelId) {
        if(type.equals(TYPE_MQ)){
            logger.debug("+++++++++++++++++++++++++");
            BeanContextAware.getBeanByType(RocketMQEnhanceTemplate.class)
                    .sendAsync(connectionHost, this.acquireMqResponse(response), callback);
            if(this.logger.isDebugEnabled()){
                logger.debug("MqConnectionMeta 发送一个消息:{}", response);
            }
        } else {
            throw new UnsupportedOperationException("不支持的 ResponseWriter.type = " + type);
        }
        return null;
    }
 
    private MqResponse acquireMqResponse(Response<?> response){
        MqResponse data = new MqResponse();
//        data.setResponse(response);
        try {
            data.setResponse(JsonUtils.objectToJsonString(response));
        } catch (Exception e) {
            throw new PlatformRuntimeException("Response<?> --> String 失败:" + e.getMessage(), e);
        }
        data.setKey(IdUtils.fastSimpleUUID());
        data.setRetryTimes(1);
        data.setSource(response.getProtocolNum());
        data.setSendTime(DateUtils.getDateTimeNumber());
        data.setTopic(response.getTopic());
        return data;
    }
}