shikeying
2024-01-11 3b67e947e36133e2a40eb2737b15ea375e157ea0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
package com.walker.push;
 
import com.walker.infrastructure.utils.StringUtils;
import com.walker.push.util.PushUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
public abstract class AbstractPushManager implements PushManager{
 
    protected transient final Logger logger = LoggerFactory.getLogger(getClass());
 
    @Override
    public void register(Pushable pushable) {
        if(pushable == null){
            throw new IllegalArgumentException("注册错误:推送者不能为空");
        }
        NotificationChannel channel = pushable.getNotificationChannel();
        List<Pushable> pushableList = this.pushableMap.get(channel.getIndex());
        if(StringUtils.isEmptyList(pushableList)){
            pushableList = new ArrayList<>(2);
            this.pushableMap.put(channel.getIndex(), pushableList);
        }
 
        if (pushableList.contains(pushable)){
            throw new IllegalArgumentException("推送者已经存在,无需重复注册,name=" + pushable.getName());
        }
        pushableList.add(pushable);
        this.pushIdMap.put(pushable.getId(), pushable);
 
        // 2023-04-27 管理器统一设置监听器对象
        if(this.pushStatusListener == null){
            throw new IllegalArgumentException("pushStatusListener 未设置!");
        }
        if(pushable.supportAsync() && pushable.getPushStatusListener() == null){
            pushable.setPushStatusListener(this.pushStatusListener);
        }
 
        logger.info("注册'推送者':{}", pushable.getName());
    }
 
    @Override
    public PushResult push(Notification notification, String pushableId) {
        if(notification == null){
            throw new IllegalArgumentException("无法推送空消息");
        }
        List<NotificationChannel> channelList = notification.getChannelList();
        if(StringUtils.isEmptyList(channelList)){
            throw new IllegalArgumentException("'notification'中不存在通道信息,无法推送");
        }
 
        if(this.strategyList != null){
            for(Strategy strategy : strategyList){
                if(!strategy.access(notification)){
                    return this.createFailedPushResult(-1, "通知未通过策略检查,无法推送", null);
                }
            }
        }
 
        List<Pushable> pushableList = null;
        Pushable pushable = null;
        PushResult pushResult = null;
 
        // 2023-04-26 是否多通道要并行完成
        boolean multipleChannelParallel = notification.getParallel();
        int channelSize = channelList.size();
 
        for(NotificationChannel channel: channelList){
            pushableList = this.pushableMap.get(channel.getIndex());
            if(StringUtils.isEmptyList(pushableList)){
//                throw new IllegalArgumentException("未找到推送者,通道信息:" + channel.getName());
                if(channelSize == 1){
                    logger.warn("只有一个通道,但也未配置推送者,推送失败:{}", notification);
                    return PushUtils.acquireFailedPushResult("通道不存在", StringUtils.collectionToDelimitedString(notification.getReceiverList(),","));
                }
                if(!multipleChannelParallel){
                    logger.debug("存在多通道,仅发送一个成功即可,通道不存在,继续找下一个");
                } else {
                    // 需要并行发送
                    logger.error("通道不存在,发送失败一个:{}", channel.getIndex());
                }
                continue;
            }
 
            try {
 
                // 2023-04-24 如果推送者本身支持异步推送,则后续保存等方法,都在监听器中实现,这里不管
//                pushable = pushableList.get(0);
                if(StringUtils.isNotEmpty(pushableId) && pushableList.size() > 1){
                    pushable = this.pushIdMap.get(pushableId);
                } else {
                    pushable = pushableList.get(0);
                }
 
                if(pushable.supportAsync()){
                    logger.debug("异步推送,PushManager不直接返回结果,需要在监听器中实现!");
                    pushable.push(notification);
                    if(!multipleChannelParallel){
                        // 单通道串行,只要有发送的就不再处理其他通道
                        return PushUtils.acquireSuccessPushResult();
                    }
//                    return null;
                    // 多通道并行,需要继续其他通道发送
                    continue;
                }
 
                // 同步推送方式
                // 推送方法拉出来,子类会实现异步方式调用。
                pushResult = this.pushOnce(pushableList, notification);
                if(!multipleChannelParallel){
                    return PushUtils.acquireSuccessPushResult();
                }
 
            } catch (PushException e) {
                throw new RuntimeException("推送一次失败:" + e.getMessage() + ", id=" + e.getMessageId(), e);
            }
        }
 
        logger.info("这里需要返回组合好的'推送结果',后续完善,暂时为空");
        return pushResult;
    }
 
    /**
     * 一次完整推送调用,默认同步实现,子类可以使用异步方式。
     * @param pushableList
     * @param notification
     * @return
     * @throws PushException
     * @date 2023-04-24
     */
    protected PushResult pushOnce(List<Pushable> pushableList, Notification notification) throws PushException{
        return this.invokePush(pushableList.get(0), notification);
    }
 
    protected PushResult invokePush(Pushable pushable, Notification notification) throws PushException{
//        PushResult pushResult = pushableList.get(0).push(notification);
        PushResult pushResult = pushable.push(notification);
        if(pushResult == null){
            throw new IllegalStateException("推送返回结果为空,请实现'push'方法!", null);
        }
        if(!StringUtils.isEmptyList(pushResult.getFailedList())){
            this.logger.error("推送消息失败,id={}, failed={}", notification.getId(), pushResult.getFailedList());
            if(!notification.getBroadcast()){
                // 非广播消息,需要检查是否全部失败,成功的需要保存
                List<String> successList = new ArrayList<>(8);
                for(String one : notification.getReceiverList()){
                    boolean inFailedList = false;
                    for(String failed : pushResult.getFailedList()){
                        if(failed.equals(one)){
                            inFailedList = true;
                            break;
                        }
                    }
                    if(!inFailedList){
                        successList.add(one);
                    }
//                    if(!pushResult.getFailedList().contains(one)){
//                        successList.add(one);
//                    }
                }
                if(!StringUtils.isEmptyList(successList)){
                    this.persistent(notification, successList, pushable.getNotificationChannel());
                    logger.debug("保存了(部分)成功推送集合");
                }
 
                // 保存失败的
                this.persistentFailed(notification, pushResult.getFailedList(), pushable.getNotificationChannel());
            }
        } else {
            // 全部成功,保存
            this.persistent(notification, notification.getReceiverList(), pushable.getNotificationChannel());
            logger.debug("保存了(全部)成功推送集合");
        }
        return pushResult;
    }
 
    @Override
    public PushResult push(List<Notification> list) {
        throw new UnsupportedOperationException();
    }
 
    @Override
    public void addStrategy(Strategy strategy) {
        if(this.strategyList == null){
            this.strategyList = new ArrayList<>(2);
        }
        if(this.strategyList.contains(strategy)){
            throw new IllegalArgumentException("策略已经存在,无需重复注册");
        }
        this.strategyList.add(strategy);
        logger.info("注册'策略':{}", strategy.getId());
    }
 
    @Override
    public List<Pushable> getPushList(){
        List<Pushable> list = new ArrayList<>(8);
        for(List<Pushable> onePushList: this.pushableMap.values()){
            for(Pushable<?> pushable: onePushList){
                list.add(pushable);
            }
        }
        return list;
    }
 
    @Override
    public List<Pushable> getPushList(NotificationChannel channel){
        return this.pushableMap.get(channel.getIndex());
    }
 
    @Override
    public Pushable getPushObject(String id){
        return this.pushIdMap.get(id);
    }
 
    @Override
    public void setAsyncListener(PushStatusListener listener){
        if(listener == null){
            throw new IllegalArgumentException("listener is required!");
        }
        this.pushStatusListener = listener;
    }
 
    /**
     * 持久化保存推送数据(到数据库)
     * @param notification 通知内容
     * @param successReceiverList 推送成功的用户集合
     */
    protected abstract void persistent(Notification notification, List<String> successReceiverList, NotificationChannel channel);
 
    /**
     * 持久化推送失败的记录
     * @param notification
     * @param failedList
     */
    protected abstract void persistentFailed(Notification notification, List<String> failedList, NotificationChannel channel);
 
    private PushResult createFailedPushResult(int code, String text, List<String> failedList){
        PushResult pushResult = new PushResult();
        pushResult.setCode(code);
        pushResult.setText(text);
        pushResult.setFailedList(failedList);
        return pushResult;
    }
 
    @Override
    public boolean isMessageParallel() {
        return messageParallel;
    }
 
    @Override
    public List<String> getMessageChannelNames(){
        return this.messageChannelNames;
    }
 
    public void setMessageParallel(boolean messageParallel) {
        this.messageParallel = messageParallel;
    }
 
    public void setMessageChannelNames(List<String> messageChannelNames) {
        this.messageChannelNames = messageChannelNames;
    }
 
    protected PushStatusListener getAsyncListener() {
        return pushStatusListener;
    }
 
    private PushStatusListener pushStatusListener;
    private List<String> messageChannelNames = new ArrayList<>(4);
    private boolean messageParallel = false;
    // 推送策略集合
    private List<Strategy> strategyList = null;
 
    // key = 通道标识,value = 推送者实例
    private Map<String, List<Pushable>> pushableMap = new HashMap<>(8);
 
    // key = id, value = 对象
    private Map<String, Pushable> pushIdMap = new HashMap<>(8);
}