package com.iplatform.base.push; import com.iplatform.base.Constants; import com.iplatform.base.PushCacheProvider; import com.iplatform.base.service.PushServiceImpl; import com.iplatform.model.po.S_message; import com.walker.infrastructure.utils.NumberGenerator; import com.walker.infrastructure.utils.StringUtils; import com.walker.push.Notification; import com.walker.push.NotificationChannel; import com.walker.push.PushStatusListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; /** * 推送监听器实现,该实现仅针对'异步'类型的推送者。
* 异步推送者如下: *
 *     1.短信推送
 *     2.
 * 
* @author 时克英 * @date 2023-04-24 * @date 2023-04-27 更新逻辑,整个推送保存都委托给该监听器实现。 */ public class DefaultPushListener implements PushStatusListener { protected final transient Logger logger = LoggerFactory.getLogger(getClass()); private static final String CACHE_VALUE = StringUtils.SYMBOL_SEMICOLON; @Override public void onSuccess(Notification notification, Object option, NotificationChannel channel) { // logger.info("成功推送'异步':{}, data = ", channel, notification); // logger.info("option = " + option); if(notification.getPersistent()){ logger.info("业务保存'成功通知':channel = {}, data={}", channel, notification); // 需要考虑多通道情况,对应的是同一个'notificationId' // 如果缓存中已存在,则不再重复保存! // 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知 if(!this.isBusinessNotification(notification)){ this.invokeDatabase(notification, true, channel); return; } // 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知) if(!notification.getParallel()){ this.invokeDatabase(notification, true, channel); return; } String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId()); // 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存 if(this.isInCache(cacheKey)){ logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId()); return; } // 保存数据库 this.invokeDatabase(notification, true, channel); // 加入缓存 this.putToCache(cacheKey); } } @Override public void onException(Notification notification, String error, NotificationChannel channel) { // logger.info("失败推送'异步':{}, data = ", channel, notification); // logger.info("error = " + error); if(notification.getPersistent()){ logger.info("业务保存'失败通知':channel = {}, data = {}", channel, notification); // 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知 if(!this.isBusinessNotification(notification)){ this.invokeDatabase(notification, false, channel); return; } // 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知) if(!notification.getParallel()){ this.invokeDatabase(notification, false, channel); return; } String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId()); // 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存 if(this.isInCache(cacheKey)){ logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId()); return; } // 保存数据库 this.invokeDatabase(notification, false, channel); // 加入缓存 this.putToCache(cacheKey); } } private void invokeDatabase(Notification notification, boolean success, NotificationChannel channel){ List data = new ArrayList<>(4); for(String userId : notification.getReceiverList()){ S_message message = new S_message(); // 因为ID出现重复,暂时在保存是生成ID,排查为何重复。2023-06-29 // message.setId(notification.getId()); message.setId(String.valueOf(NumberGenerator.getLongSequenceNumber())); message.setCreate_time(notification.getCreateTime()); message.setCreator(notification.getCreator()); message.setBroad_cast(notification.getBroadcast()? 1 : 0); message.setTitle(notification.getTitle()); message.setContent(notification.getContent()); message.setChannel_index(channel.getIndex()); message.setDelayed_time(notification.getDelayedTime()); message.setFailed(success? 0 : 1); message.setMsg_from(notification.getFrom()); message.setTime_type(notification.getTimeType().getIndex()); message.setOption_id(notification.getOptionId()); message.setRead_done(0); message.setReceiver(userId); // message.setSummary(notification.get); if(StringUtils.isEmpty(notification.getOptionType())){ message.setOption_type(Constants.PUSH_OPTION_TYPE_DEFAULT); } else { message.setOption_type(notification.getOptionType()); } data.add(message); } this.pushService.insertBatch(data); } private void putToCache(String key){ this.pushCacheProvider.put(key, CACHE_VALUE); } private boolean isInCache(String key){ String value = this.pushCacheProvider.get(key); return value != null; } /** * 返回存储临时缓存的:Key * @param userIds * @param optionType * @param optionId * @return */ private String getKey(List userIds, String optionType, String optionId){ StringBuilder sb = new StringBuilder(optionType).append(optionId); for(String userId : userIds){ sb.append(userId); } return Constants.PUSH_CACHE_PREFIX + sb.toString().hashCode(); } /** * 是否业务通知 * @param notification * @return */ private boolean isBusinessNotification(Notification notification){ if(StringUtils.isNotEmpty(notification.getOptionId()) && StringUtils.isNotEmpty(notification.getOptionType())){ return true; } return false; } public void setPushCacheProvider(PushCacheProvider pushCacheProvider) { this.pushCacheProvider = pushCacheProvider; } public void setPushService(PushServiceImpl pushService) { this.pushService = pushService; } private PushCacheProvider pushCacheProvider = null; private PushServiceImpl pushService = null; }