package com.iplatform.base.push;
|
|
import com.iplatform.base.Constants;
|
import com.iplatform.base.PushCacheProvider;
|
import com.iplatform.base.service.PushServiceImpl;
|
import com.iplatform.model.po.S_message;
|
import com.walker.infrastructure.utils.NumberGenerator;
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.push.Notification;
|
import com.walker.push.NotificationChannel;
|
import com.walker.push.PushStatusListener;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* 推送监听器实现,该实现仅针对'异步'类型的推送者。<br>
|
* 异步推送者如下:
|
* <pre>
|
* 1.短信推送
|
* 2.
|
* </pre>
|
* @author 时克英
|
* @date 2023-04-24
|
* @date 2023-04-27 更新逻辑,整个推送保存都委托给该监听器实现。
|
*/
|
public class DefaultPushListener implements PushStatusListener {
|
|
protected final transient Logger logger = LoggerFactory.getLogger(getClass());
|
|
private static final String CACHE_VALUE = StringUtils.SYMBOL_SEMICOLON;
|
|
@Override
|
public void onSuccess(Notification notification, Object option, NotificationChannel channel) {
|
// logger.info("成功推送'异步':{}, data = ", channel, notification);
|
// logger.info("option = " + option);
|
|
if(notification.getPersistent()){
|
logger.info("业务保存'成功通知':channel = {}, data={}", channel, notification);
|
// 需要考虑多通道情况,对应的是同一个'notificationId'
|
// 如果缓存中已存在,则不再重复保存!
|
|
// 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知
|
if(!this.isBusinessNotification(notification)){
|
this.invokeDatabase(notification, true, channel);
|
return;
|
}
|
|
// 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知)
|
if(!notification.getParallel()){
|
this.invokeDatabase(notification, true, channel);
|
return;
|
}
|
|
String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId());
|
|
// 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存
|
if(this.isInCache(cacheKey)){
|
logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId());
|
return;
|
}
|
|
// 保存数据库
|
this.invokeDatabase(notification, true, channel);
|
// 加入缓存
|
this.putToCache(cacheKey);
|
}
|
}
|
|
@Override
|
public void onException(Notification notification, String error, NotificationChannel channel) {
|
// logger.info("失败推送'异步':{}, data = ", channel, notification);
|
// logger.info("error = " + error);
|
|
if(notification.getPersistent()){
|
logger.info("业务保存'失败通知':channel = {}, data = {}", channel, notification);
|
|
// 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知
|
if(!this.isBusinessNotification(notification)){
|
this.invokeDatabase(notification, false, channel);
|
return;
|
}
|
|
// 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知)
|
if(!notification.getParallel()){
|
this.invokeDatabase(notification, false, channel);
|
return;
|
}
|
|
String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId());
|
|
// 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存
|
if(this.isInCache(cacheKey)){
|
logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId());
|
return;
|
}
|
|
// 保存数据库
|
this.invokeDatabase(notification, false, channel);
|
// 加入缓存
|
this.putToCache(cacheKey);
|
}
|
}
|
|
private void invokeDatabase(Notification notification, boolean success, NotificationChannel channel){
|
List<S_message> data = new ArrayList<>(4);
|
for(String userId : notification.getReceiverList()){
|
S_message message = new S_message();
|
// 因为ID出现重复,暂时在保存是生成ID,排查为何重复。2023-06-29
|
// message.setId(notification.getId());
|
message.setId(String.valueOf(NumberGenerator.getLongSequenceNumber()));
|
message.setCreate_time(notification.getCreateTime());
|
message.setCreator(notification.getCreator());
|
message.setBroad_cast(notification.getBroadcast()? 1 : 0);
|
message.setTitle(notification.getTitle());
|
message.setContent(notification.getContent());
|
message.setChannel_index(channel.getIndex());
|
message.setDelayed_time(notification.getDelayedTime());
|
message.setFailed(success? 0 : 1);
|
message.setMsg_from(notification.getFrom());
|
message.setTime_type(notification.getTimeType().getIndex());
|
message.setOption_id(notification.getOptionId());
|
message.setRead_done(0);
|
message.setReceiver(userId);
|
// message.setSummary(notification.get);
|
if(StringUtils.isEmpty(notification.getOptionType())){
|
message.setOption_type(Constants.PUSH_OPTION_TYPE_DEFAULT);
|
} else {
|
message.setOption_type(notification.getOptionType());
|
}
|
data.add(message);
|
}
|
this.pushService.insertBatch(data);
|
}
|
|
private void putToCache(String key){
|
this.pushCacheProvider.put(key, CACHE_VALUE);
|
}
|
|
private boolean isInCache(String key){
|
String value = this.pushCacheProvider.get(key);
|
return value != null;
|
}
|
|
/**
|
* 返回存储临时缓存的:Key
|
* @param userIds
|
* @param optionType
|
* @param optionId
|
* @return
|
*/
|
private String getKey(List<String> userIds, String optionType, String optionId){
|
StringBuilder sb = new StringBuilder(optionType).append(optionId);
|
for(String userId : userIds){
|
sb.append(userId);
|
}
|
return Constants.PUSH_CACHE_PREFIX + sb.toString().hashCode();
|
}
|
|
/**
|
* 是否业务通知
|
* @param notification
|
* @return
|
*/
|
private boolean isBusinessNotification(Notification notification){
|
if(StringUtils.isNotEmpty(notification.getOptionId()) && StringUtils.isNotEmpty(notification.getOptionType())){
|
return true;
|
}
|
return false;
|
}
|
|
public void setPushCacheProvider(PushCacheProvider pushCacheProvider) {
|
this.pushCacheProvider = pushCacheProvider;
|
}
|
|
public void setPushService(PushServiceImpl pushService) {
|
this.pushService = pushService;
|
}
|
|
private PushCacheProvider pushCacheProvider = null;
|
private PushServiceImpl pushService = null;
|
}
|