package com.walker.push; import com.walker.infrastructure.utils.StringUtils; import com.walker.push.util.PushUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public abstract class AbstractPushManager implements PushManager{ protected transient final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void register(Pushable pushable) { if(pushable == null){ throw new IllegalArgumentException("注册错误:推送者不能为空"); } NotificationChannel channel = pushable.getNotificationChannel(); List pushableList = this.pushableMap.get(channel.getIndex()); if(StringUtils.isEmptyList(pushableList)){ pushableList = new ArrayList<>(2); this.pushableMap.put(channel.getIndex(), pushableList); } if (pushableList.contains(pushable)){ throw new IllegalArgumentException("推送者已经存在,无需重复注册,name=" + pushable.getName()); } pushableList.add(pushable); this.pushIdMap.put(pushable.getId(), pushable); // 2023-04-27 管理器统一设置监听器对象 if(this.pushStatusListener == null){ throw new IllegalArgumentException("pushStatusListener 未设置!"); } if(pushable.supportAsync() && pushable.getPushStatusListener() == null){ pushable.setPushStatusListener(this.pushStatusListener); } logger.info("注册'推送者':{}", pushable.getName()); } @Override public PushResult push(Notification notification, String pushableId) { if(notification == null){ throw new IllegalArgumentException("无法推送空消息"); } List channelList = notification.getChannelList(); if(StringUtils.isEmptyList(channelList)){ throw new IllegalArgumentException("'notification'中不存在通道信息,无法推送"); } if(this.strategyList != null){ for(Strategy strategy : strategyList){ if(!strategy.access(notification)){ return this.createFailedPushResult(-1, "通知未通过策略检查,无法推送", null); } } } List pushableList = null; Pushable pushable = null; PushResult pushResult = null; // 2023-04-26 是否多通道要并行完成 boolean multipleChannelParallel = notification.getParallel(); int channelSize = channelList.size(); for(NotificationChannel channel: channelList){ pushableList = this.pushableMap.get(channel.getIndex()); if(StringUtils.isEmptyList(pushableList)){ // throw new IllegalArgumentException("未找到推送者,通道信息:" + channel.getName()); if(channelSize == 1){ logger.warn("只有一个通道,但也未配置推送者,推送失败:{}", notification); return PushUtils.acquireFailedPushResult("通道不存在", StringUtils.collectionToDelimitedString(notification.getReceiverList(),",")); } if(!multipleChannelParallel){ logger.debug("存在多通道,仅发送一个成功即可,通道不存在,继续找下一个"); } else { // 需要并行发送 logger.error("通道不存在,发送失败一个:{}", channel.getIndex()); } continue; } try { // 2023-04-24 如果推送者本身支持异步推送,则后续保存等方法,都在监听器中实现,这里不管 // pushable = pushableList.get(0); if(StringUtils.isNotEmpty(pushableId) && pushableList.size() > 1){ pushable = this.pushIdMap.get(pushableId); } else { pushable = pushableList.get(0); } if(pushable.supportAsync()){ logger.debug("异步推送,PushManager不直接返回结果,需要在监听器中实现!"); pushable.push(notification); if(!multipleChannelParallel){ // 单通道串行,只要有发送的就不再处理其他通道 return PushUtils.acquireSuccessPushResult(); } // return null; // 多通道并行,需要继续其他通道发送 continue; } // 同步推送方式 // 推送方法拉出来,子类会实现异步方式调用。 pushResult = this.pushOnce(pushableList, notification); if(!multipleChannelParallel){ return PushUtils.acquireSuccessPushResult(); } } catch (PushException e) { throw new RuntimeException("推送一次失败:" + e.getMessage() + ", id=" + e.getMessageId(), e); } } logger.info("这里需要返回组合好的'推送结果',后续完善,暂时为空"); return pushResult; } /** * 一次完整推送调用,默认同步实现,子类可以使用异步方式。 * @param pushableList * @param notification * @return * @throws PushException * @date 2023-04-24 */ protected PushResult pushOnce(List pushableList, Notification notification) throws PushException{ return this.invokePush(pushableList.get(0), notification); } protected PushResult invokePush(Pushable pushable, Notification notification) throws PushException{ // PushResult pushResult = pushableList.get(0).push(notification); PushResult pushResult = pushable.push(notification); if(pushResult == null){ throw new IllegalStateException("推送返回结果为空,请实现'push'方法!", null); } if(!StringUtils.isEmptyList(pushResult.getFailedList())){ this.logger.error("推送消息失败,id={}, failed={}", notification.getId(), pushResult.getFailedList()); if(!notification.getBroadcast()){ // 非广播消息,需要检查是否全部失败,成功的需要保存 List successList = new ArrayList<>(8); for(String one : notification.getReceiverList()){ boolean inFailedList = false; for(String failed : pushResult.getFailedList()){ if(failed.equals(one)){ inFailedList = true; break; } } if(!inFailedList){ successList.add(one); } // if(!pushResult.getFailedList().contains(one)){ // successList.add(one); // } } if(!StringUtils.isEmptyList(successList)){ this.persistent(notification, successList, pushable.getNotificationChannel()); logger.debug("保存了(部分)成功推送集合"); } // 保存失败的 this.persistentFailed(notification, pushResult.getFailedList(), pushable.getNotificationChannel()); } } else { // 全部成功,保存 this.persistent(notification, notification.getReceiverList(), pushable.getNotificationChannel()); logger.debug("保存了(全部)成功推送集合"); } return pushResult; } @Override public PushResult push(List list) { throw new UnsupportedOperationException(); } @Override public void addStrategy(Strategy strategy) { if(this.strategyList == null){ this.strategyList = new ArrayList<>(2); } if(this.strategyList.contains(strategy)){ throw new IllegalArgumentException("策略已经存在,无需重复注册"); } this.strategyList.add(strategy); logger.info("注册'策略':{}", strategy.getId()); } @Override public List getPushList(){ List list = new ArrayList<>(8); for(List onePushList: this.pushableMap.values()){ for(Pushable pushable: onePushList){ list.add(pushable); } } return list; } @Override public List getPushList(NotificationChannel channel){ return this.pushableMap.get(channel.getIndex()); } @Override public Pushable getPushObject(String id){ return this.pushIdMap.get(id); } @Override public void setAsyncListener(PushStatusListener listener){ if(listener == null){ throw new IllegalArgumentException("listener is required!"); } this.pushStatusListener = listener; } /** * 持久化保存推送数据(到数据库) * @param notification 通知内容 * @param successReceiverList 推送成功的用户集合 */ protected abstract void persistent(Notification notification, List successReceiverList, NotificationChannel channel); /** * 持久化推送失败的记录 * @param notification * @param failedList */ protected abstract void persistentFailed(Notification notification, List failedList, NotificationChannel channel); private PushResult createFailedPushResult(int code, String text, List failedList){ PushResult pushResult = new PushResult(); pushResult.setCode(code); pushResult.setText(text); pushResult.setFailedList(failedList); return pushResult; } @Override public boolean isMessageParallel() { return messageParallel; } @Override public List getMessageChannelNames(){ return this.messageChannelNames; } public void setMessageParallel(boolean messageParallel) { this.messageParallel = messageParallel; } public void setMessageChannelNames(List messageChannelNames) { this.messageChannelNames = messageChannelNames; } protected PushStatusListener getAsyncListener() { return pushStatusListener; } private PushStatusListener pushStatusListener; private List messageChannelNames = new ArrayList<>(4); private boolean messageParallel = false; // 推送策略集合 private List strategyList = null; // key = 通道标识,value = 推送者实例 private Map> pushableMap = new HashMap<>(8); // key = id, value = 对象 private Map pushIdMap = new HashMap<>(8); }