package com.walker.push.rocketmq; import com.walker.infrastructure.utils.JsonUtils; import com.walker.infrastructure.utils.StringUtils; import com.walker.queue.MqBaseMessage; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; /** * 加强过的消息模板对象,提供了指定环境的主题参数。 *

目前还没有实现异步发送,后续要加上。

* @author 时克英 * @date 2023-09-19 */ public class RocketMQEnhanceTemplate { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); /** * 发送同步消息。 * @param topic * @param message * @param delayLevel * @return * @param * @date 2023-09-26 */ public SendResult sendAndWait(String topic, T message, int delayLevel){ Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); return template.syncSend(topic, sendMessage, 3000, delayLevel); } public void sendAsync(String topic, T message, SendCallback callback){ this.sendAsync(topic, message, callback, 3000, 0); } /** * 发送异步消息,聊天对象使用该方法。 * @param topic 队列主题名称,业务会根据主机拼接最终名称 * @param message 消息对象 * @param callback 通知回调 * @param timeout 超时时间(毫秒) * @param delayLevel 延时级别,暂时没用,默认:0 * @param * @date 2023-09-26 */ public void sendAsync(String topic , T message, SendCallback callback, long timeout, int delayLevel){ if(StringUtils.isEmpty(topic)){ throw new IllegalArgumentException("无法发送消息,topic为空"); } Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); // this.template.asyncSend(topic, sendMessage, callback); if(delayLevel > 0){ this.template.asyncSend(topic, sendMessage, callback, timeout, delayLevel); } else { this.template.asyncSend(topic, sendMessage, callback); } } //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ //~ 以上为新方法。2023-09-26 //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 根据系统上下文自动构建隔离后的topic * 构建目的地 */ public String buildDestination(String topic, String tag) { topic = reBuildTopic(topic); return topic + StringUtils.SEPARATOR_COLON + tag; } /** * 根据环境重新隔离topic * @param topic 原始topic */ private String reBuildTopic(String topic) { if(this.enabledIsolation && StringUtils.hasText(this.environmentName)){ return topic + StringUtils.STRING_UNDERLINE + this.environmentName; } return topic; } /** * 发送同步消息 */ public SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(buildDestination(topic,tag), message); } public SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 try { logger.debug("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtils.objectToJsonString(message), JsonUtils.objectToJsonString(sendResult)); } catch (Exception e) { throw new RuntimeException(e); } return sendResult; } /** * 发送延迟消息 */ public SendResult send(String topic, String tag, T message, int delayLevel) { return send(buildDestination(topic,tag), message, delayLevel); } public SendResult send(String destination, T message, int delayLevel) { Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); try { logger.debug("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtils.objectToJsonString(message), JsonUtils.objectToJsonString(sendResult)); } catch (Exception e) { throw new RuntimeException(e); } return sendResult; } public RocketMQTemplate getTemplate() { return template; } public void setTemplate(RocketMQTemplate template) { this.template = template; } public void setEnabledIsolation(boolean enabledIsolation) { this.enabledIsolation = enabledIsolation; } public void setEnvironmentName(String environmentName) { this.environmentName = environmentName; } private boolean enabledIsolation = true; private String environmentName = StringUtils.EMPTY_STRING; private RocketMQTemplate template; }