知识 - WebSocket
# 前言
WebSocket 有两种实现:
- 实现 JDK 原生的方式,Spring Boot 支持
- 使用 Spring 自己封装的方式实现
实现 WebSocket 实现步骤:
- 实现 HandshakeInterceptor 接口,实现 websocket 握手拦截,该接口提供两个方法,一个是握手成功的前置方法,一个是握手成功的后置方法。在前置方法,获取了用户信息,存入 session 域
- 继承 AbstractWebSocketHandler 类,重写 websocket 建立连接,接收消息,关闭连接等方法。其中在建立连接方法中,将 session 存入 WebSocketSessionManager 缓存里,key 是用户信息,value 为 session,方便后续使用
- 提供 WebSocketHelper 工具类,调用方法即可发生消息。发送消息时,会从 WebSocketSessionManager 缓存里通过用户信息获取 session,返回利用 session 发送消息
除了实现 WebSocket 之外,模块还内置 Redis 发布订阅模式。发送 Websocket 消息时,同时往 Redis 发布,其他服务可以订阅来获取消息。
# 实现
依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 常量
public interface WebSocketConstant {
/**
* websocketSession中的参数的key
*/
String LOGIN_USER_KEY = "loginUser";
/**
* 订阅的频道
*/
String WEB_SOCKET_TOPIC = "work:websocket";
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 配置项
适配 application 的配置项
@Data
@ConfigurationProperties("websocket")
public class WebSocketProperties {
/**
* 是否启用
*/
private Boolean enabled;
/**
* 路径
*/
private String path = "/websocket";
/**
* 设置访问源地址
*/
private String allowedOrigins = "*";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 消息上下文
Websocket 消息上下文
@Data
public class WebSocketMessageContext implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的 session key 列表
*/
private List<String> sessionKeys;
/**
* 需要发送的消息
*/
private String message;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 缓存
WebSocketSession 用于保存当前所有在线的会话信息
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionManager {
private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 将 WebSocket 会话添加到用户会话 Map 中
*
* @param sessionKey 会话键,用于检索会话
* @param session 要添加的 WebSocket 会话
*/
public static void addSession(String sessionKey, WebSocketSession session) {
USER_SESSION_MAP.put(sessionKey, session);
}
/**
* 根据会话键从用户会话 Map 中获取 WebSocket 会话
*
* @param sessionKey 要获取的会话键
* @return 与给定会话键对应的 WebSocket 会话,如果不存在则返回 null
*/
public static WebSocketSession getSessions(String sessionKey) {
return USER_SESSION_MAP.get(sessionKey);
}
/**
* 获取存储在用户会话 Map 中所有 WebSocket 会话的会话键集合
*
* @return 所有 WebSocket 会话的会话键集合
*/
public static Set<String> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
/**
* 从用户会话 Map 中移除指定会话键对应的 WebSocket 会话
*
* @param sessionKey 要移除的会话键
*/
public static void removeSession(String sessionKey) {
if (existSession(sessionKey)) {
USER_SESSION_MAP.remove(sessionKey);
}
}
/**
* 检查给定的会话键是否存在于用户会话 Map 中
*
* @param sessionKey 要检查的会话键
* @return 如果存在对应的会话键,则返回 true;否则返回 false
*/
public static boolean existSession(String sessionKey) {
return USER_SESSION_MAP.containsKey(sessionKey);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 拦截器
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* WebSocket 握手之前执行的前置处理方法
*
* @param request WebSocket握手请求
* @param response WebSocket握手响应
* @param wsHandler WebSocket处理程序
* @param attributes 与WebSocket会话关联的属性
* @return 如果允许握手继续进行,则返回true;否则返回false
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// 用户信息
LoginUser loginUser = LoginHelper.getLoginUser();
attributes.put(WebSocketConstant.LOGIN_USER_KEY, loginUser);
return true;
}
/**
* WebSocket 握手成功后执行的后置处理方法
*
* @param request WebSocket握手请求
* @param response WebSocket握手响应
* @param wsHandler WebSocket处理程序
* @param exception 握手过程中可能出现的异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 生命周期实现
WebSocketHandler 实现类,也就是实现 WebSocket 的生命周期
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
/**
* 连接成功后事件
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
WebSocketSessionManager.addSession(loginUser.getUserId(), session);
log.info("[connect] sessionId: {}, userId:{}, username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
}
/**
* 处理接收到的文本消息事件
*
* @param session WebSocket 会话
* @param message 接收到的文本消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
// 从 WebSocket 会话中获取登录用户信息
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
// 创建 WebSocket 消息上下文对象
WebSocketMessageContext webSocketMessageContext = new WebSocketMessageContext();
webSocketMessageContext.setSessionKeys(List.of(loginUser.getUserId()));
webSocketMessageContext.setMessage(message.getPayload());
WebSocketHelper.publishMessage(webSocketMessageContext);
}
/**
* 处理接收到的二进制消息事件
*
* @param session WebSocket 会话
* @param message 接收到的二进制消息
*/
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
/**
* 处理接收到的 Pong 消息(心跳监测)
*
* @param session WebSocket 会话
* @param message 接收到的 Pong 消息
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
WebSocketHelper.sendPongMessage(session);
}
/**
* 处理 WebSocket 传输错误
*
* @param session WebSocket会话
* @param exception 发生的异常
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
/**
* 在 WebSocket 连接关闭后执行清理操作
*
* @param session WebSocket会话
* @param status 关闭状态信息
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
if (Objects.isNull(loginUser)) {
return;
}
WebSocketSessionManager.removeSession(loginUser.getUserId());
log.info("[disconnect] sessionId: {},userId:{},username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
}
/**
* 指示处理程序是否支持接收部分消息
*
* @return 如果支持接收部分消息,则返回 true;否则返回 false
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# 工具类
WebSocket 工具类
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketHelper {
/**
* 向指定的 WebSocket 会话发送消息
*
* @param sessionKey 要发送消息的用户 id
* @param message 要发送的消息内容
*/
public static void sendMessage(String sessionKey, String message) {
WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
sendMessage(session, message);
}
/**
* 订阅 WebSocket 消息主题,并提供一个消费者函数来处理接收到的消息
*
* @param listener 处理 WebSocket 消息的消费者
*/
public static void subscribeMessage(MessageListener listener) {
RedisUtil.subscribe(WebSocketConstant.WEB_SOCKET_TOPIC, listener);
}
/**
* 发布 WebSocket 订阅消息
*
* @param webSocketMessage 要发布的 WebSocket 消息对象
*/
public static void publishMessage(WebSocketMessageContext webSocketMessage) {
List<String> unsentSessionKeys = new ArrayList<>();
// 当前服务内 session,直接发送消息
if (Objects.isNull(webSocketMessage.getSessionKeys())) {
return;
}
for (String sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionManager.existSession(sessionKey)) {
WebSocketHelper.sendMessage(sessionKey, webSocketMessage.getMessage());
// 发生后则移除,防止添加到 unsentSessionKeys,因为下面 unsentSessionKeys 会通过 Redis 发布,监听器订阅到消息后,获取消息发送给 unsentSessionKeys 的 sessionKey
continue;
}
unsentSessionKeys.add(sessionKey);
}
// 不在当前服务内 session,发布订阅消息
if (ListUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
log.info(" WebSocket 发送主题订阅消息 topic:{},session keys:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
}
}
/**
* 向所有的 WebSocket 会话发布订阅的消息(群发)
*
* @param message 要发布的消息内容
*/
public static void publishAll(String message) {
WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
broadcastMessage.setMessage(message);
RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
log.info(" WebSocket 发送主题订阅消息 topic:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, message);
}
/**
* 向指定的 WebSocket 会话发送 Pong 消息
*
* @param session 要发送 Pong 消息的 WebSocket 会话
*/
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
/**
* 向指定的 WebSocket 会话发送文本消息
*
* @param session WebSocket 会话
* @param message 要发送的文本消息内容
*/
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
/**
* 向指定的 WebSocket 会话发送 WebSocket 消息对象
*
* @param session WebSocket 会话
* @param message 要发送的 WebSocket 消息对象
*/
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (Objects.isNull(session) || !session.isOpen()) {
log.warn("[send] session 会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session:{},发送消息:{} 异常", session, message, e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 主题订阅监听器
WebSocket Redis 主题订阅监听器
@Slf4j
@RequiredArgsConstructor
public class WebSocketTopicListener implements MessageListener {
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
if (Objects.equals(channel, WebSocketConstant.WEB_SOCKET_TOPIC)) {
// 反序列化消息体为 WebSocketMessageContext 对象
WebSocketMessageContext context = (WebSocketMessageContext) redisTemplate.getValueSerializer().deserialize(message.getBody());
if (Objects.isNull(context)) {
log.info("WebSocket 主题订阅收到消息为空");
return;
}
log.info("WebSocket 主题订阅收到消息 channel:{},Session Keys:{},message:{}", channel, context.getSessionKeys(), context.getMessage());
// 如果 key 不为空就按照 key 发消息,如果为空就群发
if (ListUtil.isNotEmpty(context.getSessionKeys())) {
context.getSessionKeys().forEach(key -> {
if (WebSocketSessionManager.existSession(key)) {
WebSocketHelper.sendMessage(key, context.getMessage());
}
});
} else {
WebSocketSessionManager.getSessionsAll().forEach(key -> {
WebSocketHelper.sendMessage(key, context.getMessage());
});
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 容器装配
@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfiguration {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
// 返回一个 WebSocketConfigurer 对象,用于配置 WebSocket
return registry -> registry
// 添加 WebSocket 处理程序和拦截器到指定路径,设置允许的跨域来源
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new WebSocketInterceptor();
}
@Bean
public WebSocketHandler webSocketHandler() {
return new WebSocketHandler();
}
@Bean
public WebSocketTopicListener webSocketTopicListener(RedisTemplate<String, Object> redisTemplate) {
return new WebSocketTopicListener(redisTemplate);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Spring Boot 3.x 需要在 resource 下建立 META-INF/spring
路径,然后创建 org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件,内容为
cn.youngkbt.websocket.config.WebSocketConfiguration
1
这样 Spring 会自动扫描该文件的容器装配类,将里面涉及的类注入到 Spring 容器。
# 示例
@RestController
@RequestMapping("/demo/websocket")
public class DemoWebSocketController {
/**
* 发布消息
*
*/
@GetMapping("/send")
public Response<String> send(WebSocketMessageContext webSocketMessageContext) {
WebSocketHelper.publishMessage(webSocketMessageContext);
return HttpResult.ok("操作成功");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
编辑此页 (opens new window)
更新时间: 2024/06/15, 16:39:27