shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.walker.push.rocketmq;
 
import com.walker.infrastructure.utils.JsonUtils;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.queue.MqBaseMessage;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
 
/**
 * 加强过的消息模板对象,提供了指定环境的主题参数。
 * <p>目前还没有实现异步发送,后续要加上。</p>
 * @author 时克英
 * @date 2023-09-19
 */
public class RocketMQEnhanceTemplate {
 
    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
    /**
     * 发送同步消息。
     * @param topic
     * @param message
     * @param delayLevel
     * @return
     * @param <T>
     * @date 2023-09-26
     */
    public <T extends MqBaseMessage> SendResult sendAndWait(String topic, T message, int delayLevel){
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        return template.syncSend(topic, sendMessage, 3000, delayLevel);
    }
 
    public <T extends MqBaseMessage> void sendAsync(String topic, T message, SendCallback callback){
        this.sendAsync(topic, message, callback, 3000, 0);
    }
 
    /**
     * 发送异步消息,聊天对象使用该方法。
     * @param topic 队列主题名称,业务会根据主机拼接最终名称
     * @param message 消息对象
     * @param callback 通知回调
     * @param timeout 超时时间(毫秒)
     * @param delayLevel 延时级别,暂时没用,默认:0
     * @param <T>
     * @date 2023-09-26
     */
    public <T extends MqBaseMessage> void sendAsync(String topic
            , T message, SendCallback callback, long timeout, int delayLevel){
        if(StringUtils.isEmpty(topic)){
            throw new IllegalArgumentException("无法发送消息,topic为空");
        }
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
//        this.template.asyncSend(topic, sendMessage, callback);
        if(delayLevel > 0){
            this.template.asyncSend(topic, sendMessage, callback, timeout, delayLevel);
        } else {
            this.template.asyncSend(topic, sendMessage, callback);
        }
    }
 
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    //~ 以上为新方法。2023-09-26
    //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
    /**
     * 根据系统上下文自动构建隔离后的topic
     * 构建目的地
     */
    public String buildDestination(String topic, String tag) {
        topic = reBuildTopic(topic);
        return topic + StringUtils.SEPARATOR_COLON + tag;
    }
 
    /**
     * 根据环境重新隔离topic
     * @param topic 原始topic
     */
    private String reBuildTopic(String topic) {
        if(this.enabledIsolation && StringUtils.hasText(this.environmentName)){
            return topic + StringUtils.STRING_UNDERLINE + this.environmentName;
        }
        return topic;
    }
 
    /**
     * 发送同步消息
     */
    public <T extends MqBaseMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(buildDestination(topic,tag), message);
    }
 
    public <T extends MqBaseMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        try {
            logger.debug("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtils.objectToJsonString(message), JsonUtils.objectToJsonString(sendResult));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return sendResult;
    }
 
    /**
     * 发送延迟消息
     */
    public <T extends MqBaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(buildDestination(topic,tag), message, delayLevel);
    }
 
    public <T extends MqBaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        try {
            logger.debug("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtils.objectToJsonString(message), JsonUtils.objectToJsonString(sendResult));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return sendResult;
    }
 
    public RocketMQTemplate getTemplate() {
        return template;
    }
 
    public void setTemplate(RocketMQTemplate template) {
        this.template = template;
    }
 
    public void setEnabledIsolation(boolean enabledIsolation) {
        this.enabledIsolation = enabledIsolation;
    }
 
    public void setEnvironmentName(String environmentName) {
        this.environmentName = environmentName;
    }
    private boolean enabledIsolation = true;
 
    private String environmentName = StringUtils.EMPTY_STRING;
 
    private RocketMQTemplate template;
}