package com.walker.push;
|
|
import com.walker.infrastructure.utils.StringUtils;
|
import com.walker.push.util.PushUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
public abstract class AbstractPushManager implements PushManager{
|
|
protected transient final Logger logger = LoggerFactory.getLogger(getClass());
|
|
@Override
|
public void register(Pushable pushable) {
|
if(pushable == null){
|
throw new IllegalArgumentException("注册错误:推送者不能为空");
|
}
|
NotificationChannel channel = pushable.getNotificationChannel();
|
List<Pushable> pushableList = this.pushableMap.get(channel.getIndex());
|
if(StringUtils.isEmptyList(pushableList)){
|
pushableList = new ArrayList<>(2);
|
this.pushableMap.put(channel.getIndex(), pushableList);
|
}
|
|
if (pushableList.contains(pushable)){
|
throw new IllegalArgumentException("推送者已经存在,无需重复注册,name=" + pushable.getName());
|
}
|
pushableList.add(pushable);
|
this.pushIdMap.put(pushable.getId(), pushable);
|
|
// 2023-04-27 管理器统一设置监听器对象
|
if(this.pushStatusListener == null){
|
throw new IllegalArgumentException("pushStatusListener 未设置!");
|
}
|
if(pushable.supportAsync() && pushable.getPushStatusListener() == null){
|
pushable.setPushStatusListener(this.pushStatusListener);
|
}
|
|
logger.info("注册'推送者':{}", pushable.getName());
|
}
|
|
@Override
|
public PushResult push(Notification notification, String pushableId) {
|
if(notification == null){
|
throw new IllegalArgumentException("无法推送空消息");
|
}
|
List<NotificationChannel> channelList = notification.getChannelList();
|
if(StringUtils.isEmptyList(channelList)){
|
throw new IllegalArgumentException("'notification'中不存在通道信息,无法推送");
|
}
|
|
if(this.strategyList != null){
|
for(Strategy strategy : strategyList){
|
if(!strategy.access(notification)){
|
return this.createFailedPushResult(-1, "通知未通过策略检查,无法推送", null);
|
}
|
}
|
}
|
|
List<Pushable> pushableList = null;
|
Pushable pushable = null;
|
PushResult pushResult = null;
|
|
// 2023-04-26 是否多通道要并行完成
|
boolean multipleChannelParallel = notification.getParallel();
|
int channelSize = channelList.size();
|
|
for(NotificationChannel channel: channelList){
|
pushableList = this.pushableMap.get(channel.getIndex());
|
if(StringUtils.isEmptyList(pushableList)){
|
// throw new IllegalArgumentException("未找到推送者,通道信息:" + channel.getName());
|
if(channelSize == 1){
|
logger.warn("只有一个通道,但也未配置推送者,推送失败:{}", notification);
|
return PushUtils.acquireFailedPushResult("通道不存在", StringUtils.collectionToDelimitedString(notification.getReceiverList(),","));
|
}
|
if(!multipleChannelParallel){
|
logger.debug("存在多通道,仅发送一个成功即可,通道不存在,继续找下一个");
|
} else {
|
// 需要并行发送
|
logger.error("通道不存在,发送失败一个:{}", channel.getIndex());
|
}
|
continue;
|
}
|
|
try {
|
|
// 2023-04-24 如果推送者本身支持异步推送,则后续保存等方法,都在监听器中实现,这里不管
|
// pushable = pushableList.get(0);
|
if(StringUtils.isNotEmpty(pushableId) && pushableList.size() > 1){
|
pushable = this.pushIdMap.get(pushableId);
|
} else {
|
pushable = pushableList.get(0);
|
}
|
|
if(pushable.supportAsync()){
|
logger.debug("异步推送,PushManager不直接返回结果,需要在监听器中实现!");
|
pushable.push(notification);
|
if(!multipleChannelParallel){
|
// 单通道串行,只要有发送的就不再处理其他通道
|
return PushUtils.acquireSuccessPushResult();
|
}
|
// return null;
|
// 多通道并行,需要继续其他通道发送
|
continue;
|
}
|
|
// 同步推送方式
|
// 推送方法拉出来,子类会实现异步方式调用。
|
pushResult = this.pushOnce(pushableList, notification);
|
if(!multipleChannelParallel){
|
return PushUtils.acquireSuccessPushResult();
|
}
|
|
} catch (PushException e) {
|
throw new RuntimeException("推送一次失败:" + e.getMessage() + ", id=" + e.getMessageId(), e);
|
}
|
}
|
|
logger.info("这里需要返回组合好的'推送结果',后续完善,暂时为空");
|
return pushResult;
|
}
|
|
/**
|
* 一次完整推送调用,默认同步实现,子类可以使用异步方式。
|
* @param pushableList
|
* @param notification
|
* @return
|
* @throws PushException
|
* @date 2023-04-24
|
*/
|
protected PushResult pushOnce(List<Pushable> pushableList, Notification notification) throws PushException{
|
return this.invokePush(pushableList.get(0), notification);
|
}
|
|
protected PushResult invokePush(Pushable pushable, Notification notification) throws PushException{
|
// PushResult pushResult = pushableList.get(0).push(notification);
|
PushResult pushResult = pushable.push(notification);
|
if(pushResult == null){
|
throw new IllegalStateException("推送返回结果为空,请实现'push'方法!", null);
|
}
|
if(!StringUtils.isEmptyList(pushResult.getFailedList())){
|
this.logger.error("推送消息失败,id={}, failed={}", notification.getId(), pushResult.getFailedList());
|
if(!notification.getBroadcast()){
|
// 非广播消息,需要检查是否全部失败,成功的需要保存
|
List<String> successList = new ArrayList<>(8);
|
for(String one : notification.getReceiverList()){
|
boolean inFailedList = false;
|
for(String failed : pushResult.getFailedList()){
|
if(failed.equals(one)){
|
inFailedList = true;
|
break;
|
}
|
}
|
if(!inFailedList){
|
successList.add(one);
|
}
|
// if(!pushResult.getFailedList().contains(one)){
|
// successList.add(one);
|
// }
|
}
|
if(!StringUtils.isEmptyList(successList)){
|
this.persistent(notification, successList, pushable.getNotificationChannel());
|
logger.debug("保存了(部分)成功推送集合");
|
}
|
|
// 保存失败的
|
this.persistentFailed(notification, pushResult.getFailedList(), pushable.getNotificationChannel());
|
}
|
} else {
|
// 全部成功,保存
|
this.persistent(notification, notification.getReceiverList(), pushable.getNotificationChannel());
|
logger.debug("保存了(全部)成功推送集合");
|
}
|
return pushResult;
|
}
|
|
@Override
|
public PushResult push(List<Notification> list) {
|
throw new UnsupportedOperationException();
|
}
|
|
@Override
|
public void addStrategy(Strategy strategy) {
|
if(this.strategyList == null){
|
this.strategyList = new ArrayList<>(2);
|
}
|
if(this.strategyList.contains(strategy)){
|
throw new IllegalArgumentException("策略已经存在,无需重复注册");
|
}
|
this.strategyList.add(strategy);
|
logger.info("注册'策略':{}", strategy.getId());
|
}
|
|
@Override
|
public List<Pushable> getPushList(){
|
List<Pushable> list = new ArrayList<>(8);
|
for(List<Pushable> onePushList: this.pushableMap.values()){
|
for(Pushable<?> pushable: onePushList){
|
list.add(pushable);
|
}
|
}
|
return list;
|
}
|
|
@Override
|
public List<Pushable> getPushList(NotificationChannel channel){
|
return this.pushableMap.get(channel.getIndex());
|
}
|
|
@Override
|
public Pushable getPushObject(String id){
|
return this.pushIdMap.get(id);
|
}
|
|
@Override
|
public void setAsyncListener(PushStatusListener listener){
|
if(listener == null){
|
throw new IllegalArgumentException("listener is required!");
|
}
|
this.pushStatusListener = listener;
|
}
|
|
/**
|
* 持久化保存推送数据(到数据库)
|
* @param notification 通知内容
|
* @param successReceiverList 推送成功的用户集合
|
*/
|
protected abstract void persistent(Notification notification, List<String> successReceiverList, NotificationChannel channel);
|
|
/**
|
* 持久化推送失败的记录
|
* @param notification
|
* @param failedList
|
*/
|
protected abstract void persistentFailed(Notification notification, List<String> failedList, NotificationChannel channel);
|
|
private PushResult createFailedPushResult(int code, String text, List<String> failedList){
|
PushResult pushResult = new PushResult();
|
pushResult.setCode(code);
|
pushResult.setText(text);
|
pushResult.setFailedList(failedList);
|
return pushResult;
|
}
|
|
@Override
|
public boolean isMessageParallel() {
|
return messageParallel;
|
}
|
|
@Override
|
public List<String> getMessageChannelNames(){
|
return this.messageChannelNames;
|
}
|
|
public void setMessageParallel(boolean messageParallel) {
|
this.messageParallel = messageParallel;
|
}
|
|
public void setMessageChannelNames(List<String> messageChannelNames) {
|
this.messageChannelNames = messageChannelNames;
|
}
|
|
protected PushStatusListener getAsyncListener() {
|
return pushStatusListener;
|
}
|
|
private PushStatusListener pushStatusListener;
|
private List<String> messageChannelNames = new ArrayList<>(4);
|
private boolean messageParallel = false;
|
// 推送策略集合
|
private List<Strategy> strategyList = null;
|
|
// key = 通道标识,value = 推送者实例
|
private Map<String, List<Pushable>> pushableMap = new HashMap<>(8);
|
|
// key = id, value = 对象
|
private Map<String, Pushable> pushIdMap = new HashMap<>(8);
|
}
|