package com.iplatform.tcp.controller; import com.iplatform.base.SystemController; import com.iplatform.tcp.util.ws.WebBroadCastResponse; import com.iplatform.tcp.util.ws.WebDataResponse; import com.walker.infrastructure.utils.DateUtils; import com.walker.infrastructure.utils.NumberGenerator; import com.walker.infrastructure.utils.StringUtils; import com.walker.push.Notification; import com.walker.push.NotificationChannel; import com.walker.push.PushManager; import com.walker.push.PushResult; import com.walker.tcp.Connection; import com.walker.tcp.ConnectionManager; import com.walker.tcp.connect.LongConnection; import com.walker.tcp.lb.LongConnectionMeta; import com.walker.tcp.lb.RedisConnectionMetaCache; import com.walker.tcp.lb.RedisConnectionNameCache; import com.walker.tcp.netty.WebSocketEngine; import com.walker.web.ResponseValue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.lang.Nullable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.*; @RestController @RequestMapping("/test/tcp") public class TestTcpController extends SystemController { private WebSocketEngine webSocketEngine; private PushManager pushManager; private RedisConnectionMetaCache connectionMetaCache; private RedisConnectionNameCache connectionNameCache; @Autowired public TestTcpController(WebSocketEngine webSocketEngine, PushManager pushManager , @Nullable RedisConnectionMetaCache connectionMetaCache , @Nullable RedisConnectionNameCache connectionNameCache){ this.webSocketEngine = webSocketEngine; this.pushManager = pushManager; this.connectionMetaCache = connectionMetaCache; this.connectionNameCache = connectionNameCache; } /** * 获取websocket基本情况。 * @return * @date 2023-12-14 */ @RequestMapping("/ws/info") public ResponseValue getWebsocketLinkInfo(){ Map infoMap = new HashMap<>(8); if(this.connectionNameCache != null){ long totalWebsocketSize = this.connectionNameCache.getCache().getPersistentSize(); infoMap.put("total_websocket", totalWebsocketSize); infoMap.put("create_time", connectionNameCache.getCreateTime()); infoMap.put("cache_name", this.connectionNameCache.getProviderName()); } if(this.connectionMetaCache != null){ infoMap.put("connection_meta_size", this.connectionMetaCache.getCache().getPersistentSize()); } return ResponseValue.success(infoMap); } /** * 获取一个链接信息。 * @param userId 连接绑定的用户名称(name),一般为用户ID。 * @return * @date 2023-12-14 */ @RequestMapping("/ws/connection") public ResponseValue getWebsocketConnection(String userId){ if(StringUtils.isEmpty(userId)){ return ResponseValue.error("userId is required!"); } Map infoMap = new HashMap<>(8); ConnectionManager connectionManager = this.webSocketEngine.getConnectionManager(); if(connectionManager != null){ LongConnection connection = (LongConnection) connectionManager.getConnectionByName(userId); if(connection == null){ return ResponseValue.success("连接不存在:" + userId); } if(connection instanceof LongConnectionMeta){ infoMap.put("type", "集群连接对象(LongConnectionMeta),不是当前主机物理连接"); } infoMap.put("id", connection.getId()); infoMap.put("host", connection.getConnectionHost()); infoMap.put("name", connection.getName()); infoMap.put("alreadyLogin", connection.getAlreadyLogin()); infoMap.put("engineId", connection.getEngineId()); infoMap.put("lastTime", connection.getLastTime()); } else { infoMap.put("error", "connectionManager 为空"); } return ResponseValue.success(infoMap); } /** * 向所有链接广播消息。 * @param name * @param uid * @return */ @RequestMapping("/ws/send_broadcast") public ResponseValue sendTestCommand(String name, String uid){ WebBroadCastResponse msg = new WebBroadCastResponse(); msg.setMessageId(String.valueOf(NumberGenerator.getSequenceNumber())); msg.setName(uid); msg.setData(name + ": " + msg.getMessageId()); webSocketEngine.sendBroadcast(msg); return ResponseValue.success(); } /** * 向浏览器(用户)推送一个websocket实时消息。 * @param userIds * @return * @date 2024-01-23 */ @RequestMapping("/ws/send") public ResponseValue sendToUser(String userIds){ if (StringUtils.isNotEmpty(userIds)){ String[] split = userIds.split(","); List ids = Arrays.asList(split); for (String id : ids) { Connection connection = this.webSocketEngine.getConnectionManager().getConnectionByName(id); if (connection!=null){ WebDataResponse msg = new WebDataResponse(); msg.setMessageId(NumberGenerator.getLongSequenceId()); msg.setName(id); // 指定用户发送,该id与浏览器端注册的id保持一致。 msg.setData("有新消息,速度刷新"); webSocketEngine.sendResponse(msg); } } } return ResponseValue.success(); } @RequestMapping("/push") public ResponseValue pushOne(String type, String userId){ PushResult pushResult = null; if(type.equals("1")){ logger.info("推送短信测试,user = 0"); pushResult = this.pushManager.push(this.acquireNotification(NotificationChannel.Sms, "13838277463"), null); } else if(type.equals("2")){ if(StringUtils.isEmpty(userId)){ userId = "0"; } pushResult = this.pushManager.push(this.acquireNotification(NotificationChannel.WebSocket, userId), null); } else { throw new UnsupportedOperationException("暂不支持其他推送方式:" + type); } return ResponseValue.success(pushResult); } private Notification acquireNotification(NotificationChannel channel, String user){ Notification n = new Notification(); n.setId(NumberGenerator.generatorHexUUID()); n.setCreateTime(DateUtils.getDateTimeNumber()); n.setCreator("creator"); n.setFrom("shikeying"); n.setContent("这是一个测试"); List channels = new ArrayList<>(2); channels.add(channel); n.setChannelList(channels); n.setPersistent(true); n.setTitle("title"); List userList = new ArrayList<>(2); userList.add(user); n.setReceiverList(userList); return n; } }