WangHan
2025-04-02 a8ba678a3fe5a39da2c732014cebbb66e408e97c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package com.iplatform.base.push;
 
import com.iplatform.base.Constants;
import com.iplatform.base.PushCacheProvider;
import com.iplatform.base.service.PushServiceImpl;
import com.iplatform.model.po.S_message;
import com.walker.infrastructure.utils.NumberGenerator;
import com.walker.infrastructure.utils.StringUtils;
import com.walker.push.Notification;
import com.walker.push.NotificationChannel;
import com.walker.push.PushStatusListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.ArrayList;
import java.util.List;
 
/**
 * 推送监听器实现,该实现仅针对'异步'类型的推送者。<br>
 * 异步推送者如下:
 * <pre>
 *     1.短信推送
 *     2.
 * </pre>
 * @author 时克英
 * @date 2023-04-24
 * @date 2023-04-27 更新逻辑,整个推送保存都委托给该监听器实现。
 */
public class DefaultPushListener implements PushStatusListener {
 
    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
    private static final String CACHE_VALUE = StringUtils.SYMBOL_SEMICOLON;
 
    @Override
    public void onSuccess(Notification notification, Object option, NotificationChannel channel) {
//        logger.info("成功推送'异步':{}, data = ", channel, notification);
//        logger.info("option = " + option);
 
        if(notification.getPersistent()){
            logger.info("业务保存'成功通知':channel = {}, data={}", channel, notification);
            // 需要考虑多通道情况,对应的是同一个'notificationId'
            // 如果缓存中已存在,则不再重复保存!
 
            // 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知
            if(!this.isBusinessNotification(notification)){
                this.invokeDatabase(notification, true, channel);
                return;
            }
 
            // 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知)
            if(!notification.getParallel()){
                this.invokeDatabase(notification, true, channel);
                return;
            }
 
            String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId());
 
            // 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存
            if(this.isInCache(cacheKey)){
                logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId());
                return;
            }
 
            // 保存数据库
            this.invokeDatabase(notification, true, channel);
            // 加入缓存
            this.putToCache(cacheKey);
        }
    }
 
    @Override
    public void onException(Notification notification, String error, NotificationChannel channel) {
//        logger.info("失败推送'异步':{}, data = ", channel, notification);
//        logger.info("error = " + error);
 
        if(notification.getPersistent()){
            logger.info("业务保存'失败通知':channel = {}, data = {}", channel, notification);
 
            // 是否业务性通知,对于:邮件、短信类,我们不认为是业务通知
            if(!this.isBusinessNotification(notification)){
                this.invokeDatabase(notification, false, channel);
                return;
            }
 
            // 如果是非并行通知,只要一个完成就OK的,直接保存(因为后续不会出现同一个通知)
            if(!notification.getParallel()){
                this.invokeDatabase(notification, false, channel);
                return;
            }
 
            String cacheKey = this.getKey(notification.getReceiverList(), notification.getOptionType(), notification.getOptionId());
 
            // 对于并行通知,不知道后面是否还会有:其他通道(相同消息),所以要加入缓存避免重复保存
            if(this.isInCache(cacheKey)){
                logger.debug("缓存中已存在,说明通知已经保存过,bizId={}", notification.getOptionId());
                return;
            }
 
            // 保存数据库
            this.invokeDatabase(notification, false, channel);
            // 加入缓存
            this.putToCache(cacheKey);
        }
    }
 
    private void invokeDatabase(Notification notification, boolean success, NotificationChannel channel){
        List<S_message> data = new ArrayList<>(4);
        for(String userId : notification.getReceiverList()){
            S_message message = new S_message();
            // 因为ID出现重复,暂时在保存是生成ID,排查为何重复。2023-06-29
//            message.setId(notification.getId());
            message.setId(String.valueOf(NumberGenerator.getLongSequenceNumber()));
            message.setCreate_time(notification.getCreateTime());
            message.setCreator(notification.getCreator());
            message.setBroad_cast(notification.getBroadcast()? 1 : 0);
            message.setTitle(notification.getTitle());
            message.setContent(notification.getContent());
            message.setChannel_index(channel.getIndex());
            message.setDelayed_time(notification.getDelayedTime());
            message.setFailed(success? 0 : 1);
            message.setMsg_from(notification.getFrom());
            message.setTime_type(notification.getTimeType().getIndex());
            message.setOption_id(notification.getOptionId());
            message.setRead_done(0);
            message.setReceiver(userId);
//            message.setSummary(notification.get);
            if(StringUtils.isEmpty(notification.getOptionType())){
                message.setOption_type(Constants.PUSH_OPTION_TYPE_DEFAULT);
            } else {
                message.setOption_type(notification.getOptionType());
            }
            data.add(message);
        }
        this.pushService.insertBatch(data);
    }
 
    private void putToCache(String key){
        this.pushCacheProvider.put(key, CACHE_VALUE);
    }
 
    private boolean isInCache(String key){
        String value = this.pushCacheProvider.get(key);
        return value != null;
    }
 
    /**
     * 返回存储临时缓存的:Key
     * @param userIds
     * @param optionType
     * @param optionId
     * @return
     */
    private String getKey(List<String> userIds, String optionType, String optionId){
        StringBuilder sb = new StringBuilder(optionType).append(optionId);
        for(String userId : userIds){
            sb.append(userId);
        }
        return Constants.PUSH_CACHE_PREFIX + sb.toString().hashCode();
    }
 
    /**
     * 是否业务通知
     * @param notification
     * @return
     */
    private boolean isBusinessNotification(Notification notification){
        if(StringUtils.isNotEmpty(notification.getOptionId()) && StringUtils.isNotEmpty(notification.getOptionType())){
            return true;
        }
        return false;
    }
 
    public void setPushCacheProvider(PushCacheProvider pushCacheProvider) {
        this.pushCacheProvider = pushCacheProvider;
    }
 
    public void setPushService(PushServiceImpl pushService) {
        this.pushService = pushService;
    }
 
    private PushCacheProvider pushCacheProvider = null;
    private PushServiceImpl pushService = null;
}