Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • 设计模式

  • 算法

  • 知识

    • 知识 - 对象
    • 知识 - 幂等性
    • 知识 - 分布式事务
    • 知识 - MapStruct
    • 知识 - MapStructPlus
    • 知识 - CompletableFuture
    • 知识 - EasyExcel
    • 知识 - Encrypt
    • 知识 - 接口幂等性
    • 知识 - 数据脱敏
    • 知识 - WebSocket
      • 前言
      • 实现
        • 常量
        • 配置项
        • 消息上下文
        • 缓存
        • 拦截器
        • 生命周期实现
        • 工具类
        • 主题订阅监听器
        • 容器装配
      • 示例
    • 知识 - Spring Cache
    • 知识 - 请求日志输出
    • 知识 - 接口限流
  • 开发
  • 知识
Young Kbt
2024-06-15
目录

知识 - WebSocket

  • 前言
  • 实现
    • 常量
    • 配置项
    • 消息上下文
    • 缓存
    • 拦截器
    • 生命周期实现
    • 工具类
    • 主题订阅监听器
    • 容器装配
  • 示例

# 前言

WebSocket 有两种实现:

  • 实现 JDK 原生的方式,Spring Boot 支持
  • 使用 Spring 自己封装的方式实现

实现 WebSocket 实现步骤:

  1. 实现 HandshakeInterceptor 接口,实现 websocket 握手拦截,该接口提供两个方法,一个是握手成功的前置方法,一个是握手成功的后置方法。在前置方法,获取了用户信息,存入 session 域
  2. 继承 AbstractWebSocketHandler 类,重写 websocket 建立连接,接收消息,关闭连接等方法。其中在建立连接方法中,将 session 存入 WebSocketSessionManager 缓存里,key 是用户信息,value 为 session,方便后续使用
  3. 提供 WebSocketHelper 工具类,调用方法即可发生消息。发送消息时,会从 WebSocketSessionManager 缓存里通过用户信息获取 session,返回利用 session 发送消息

除了实现 WebSocket 之外,模块还内置 Redis 发布订阅模式。发送 Websocket 消息时,同时往 Redis 发布,其他服务可以订阅来获取消息。

# 实现

依赖

<dependencies>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.32</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 常量

public interface WebSocketConstant {
    /**
     * websocketSession中的参数的key
     */
    String LOGIN_USER_KEY = "loginUser";

    /**
     * 订阅的频道
     */
    String WEB_SOCKET_TOPIC = "work:websocket";

}
1
2
3
4
5
6
7
8
9
10
11
12

# 配置项

适配 application 的配置项

@Data
@ConfigurationProperties("websocket")
public class WebSocketProperties {

    /**
     * 是否启用
     */
    private Boolean enabled;

    /**
     * 路径
     */
    private String path = "/websocket";

    /**
     *  设置访问源地址
     */
    private String allowedOrigins = "*";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 消息上下文

Websocket 消息上下文

@Data
public class WebSocketMessageContext implements Serializable {
    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * 需要推送到的 session key 列表
     */
    private List<String> sessionKeys;

    /**
     * 需要发送的消息
     */
    private String message;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 缓存

WebSocketSession 用于保存当前所有在线的会话信息

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionManager {
    
    private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();

    /**
     * 将 WebSocket 会话添加到用户会话 Map 中
     *
     * @param sessionKey 会话键,用于检索会话
     * @param session    要添加的 WebSocket 会话
     */
    public static void addSession(String sessionKey, WebSocketSession session) {
        USER_SESSION_MAP.put(sessionKey, session);
    }

    /**
     * 根据会话键从用户会话 Map 中获取 WebSocket 会话
     *
     * @param sessionKey 要获取的会话键
     * @return 与给定会话键对应的 WebSocket 会话,如果不存在则返回 null
     */
    public static WebSocketSession getSessions(String sessionKey) {
        return USER_SESSION_MAP.get(sessionKey);
    }

    /**
     * 获取存储在用户会话 Map 中所有 WebSocket 会话的会话键集合
     *
     * @return 所有 WebSocket 会话的会话键集合
     */
    public static Set<String> getSessionsAll() {
        return USER_SESSION_MAP.keySet();
    }

    /**
     * 从用户会话 Map 中移除指定会话键对应的 WebSocket 会话
     *
     * @param sessionKey 要移除的会话键
     */
    public static void removeSession(String sessionKey) {
        if (existSession(sessionKey)) {
            USER_SESSION_MAP.remove(sessionKey);
        }
    }

    /**
     * 检查给定的会话键是否存在于用户会话 Map 中
     *
     * @param sessionKey 要检查的会话键
     * @return 如果存在对应的会话键,则返回 true;否则返回 false
     */
    public static boolean existSession(String sessionKey) {
        return USER_SESSION_MAP.containsKey(sessionKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 拦截器

public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * WebSocket 握手之前执行的前置处理方法
     *
     * @param request    WebSocket握手请求
     * @param response   WebSocket握手响应
     * @param wsHandler  WebSocket处理程序
     * @param attributes 与WebSocket会话关联的属性
     * @return 如果允许握手继续进行,则返回true;否则返回false
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        // 用户信息
        LoginUser loginUser = LoginHelper.getLoginUser();
        attributes.put(WebSocketConstant.LOGIN_USER_KEY, loginUser);
        return true;
    }

    /**
     * WebSocket 握手成功后执行的后置处理方法
     *
     * @param request   WebSocket握手请求
     * @param response  WebSocket握手响应
     * @param wsHandler WebSocket处理程序
     * @param exception 握手过程中可能出现的异常
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 生命周期实现

WebSocketHandler 实现类,也就是实现 WebSocket 的生命周期

@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {

    /**
     * 连接成功后事件
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        WebSocketSessionManager.addSession(loginUser.getUserId(), session);
        log.info("[connect] sessionId: {}, userId:{}, username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }

    /**
     * 处理接收到的文本消息事件
     *
     * @param session WebSocket 会话
     * @param message 接收到的文本消息
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        // 从 WebSocket 会话中获取登录用户信息
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);

        // 创建 WebSocket 消息上下文对象
        WebSocketMessageContext webSocketMessageContext = new WebSocketMessageContext();
        webSocketMessageContext.setSessionKeys(List.of(loginUser.getUserId()));
        webSocketMessageContext.setMessage(message.getPayload());
        WebSocketHelper.publishMessage(webSocketMessageContext);
    }

    /**
     * 处理接收到的二进制消息事件
     *
     * @param session WebSocket 会话
     * @param message 接收到的二进制消息
     */
    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        super.handleBinaryMessage(session, message);
    }

    /**
     * 处理接收到的 Pong 消息(心跳监测)
     *
     * @param session WebSocket 会话
     * @param message 接收到的 Pong 消息
     */
    @Override
    protected void handlePongMessage(WebSocketSession session, PongMessage message) {
        WebSocketHelper.sendPongMessage(session);
    }

    /**
     * 处理 WebSocket 传输错误
     *
     * @param session   WebSocket会话
     * @param exception 发生的异常
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
    }

    /**
     * 在 WebSocket 连接关闭后执行清理操作
     *
     * @param session WebSocket会话
     * @param status  关闭状态信息
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        if (Objects.isNull(loginUser)) {
            return;
        }
        WebSocketSessionManager.removeSession(loginUser.getUserId());
        log.info("[disconnect] sessionId: {},userId:{},username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }

    /**
     * 指示处理程序是否支持接收部分消息
     *
     * @return 如果支持接收部分消息,则返回 true;否则返回 false
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

# 工具类

WebSocket 工具类

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketHelper {

    /**
     * 向指定的 WebSocket 会话发送消息
     *
     * @param sessionKey 要发送消息的用户 id
     * @param message    要发送的消息内容
     */
    public static void sendMessage(String sessionKey, String message) {
        WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
        sendMessage(session, message);
    }

    /**
     * 订阅 WebSocket 消息主题,并提供一个消费者函数来处理接收到的消息
     *
     * @param listener 处理 WebSocket 消息的消费者
     */
    public static void subscribeMessage(MessageListener listener) {
        RedisUtil.subscribe(WebSocketConstant.WEB_SOCKET_TOPIC, listener);
    }

    /**
     * 发布 WebSocket 订阅消息
     *
     * @param webSocketMessage 要发布的 WebSocket 消息对象
     */
    public static void publishMessage(WebSocketMessageContext webSocketMessage) {
        List<String> unsentSessionKeys = new ArrayList<>();
        // 当前服务内 session,直接发送消息
        if (Objects.isNull(webSocketMessage.getSessionKeys())) {
            return;
        }
        for (String sessionKey : webSocketMessage.getSessionKeys()) {
            if (WebSocketSessionManager.existSession(sessionKey)) {
                WebSocketHelper.sendMessage(sessionKey, webSocketMessage.getMessage());
                // 发生后则移除,防止添加到 unsentSessionKeys,因为下面 unsentSessionKeys 会通过 Redis 发布,监听器订阅到消息后,获取消息发送给 unsentSessionKeys 的 sessionKey
                continue;
            }
            unsentSessionKeys.add(sessionKey);
        }
        // 不在当前服务内 session,发布订阅消息
        if (ListUtil.isNotEmpty(unsentSessionKeys)) {
            WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
            broadcastMessage.setMessage(webSocketMessage.getMessage());
            broadcastMessage.setSessionKeys(unsentSessionKeys);
            RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
            log.info(" WebSocket 发送主题订阅消息 topic:{},session keys:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
        }
    }

    /**
     * 向所有的 WebSocket 会话发布订阅的消息(群发)
     *
     * @param message 要发布的消息内容
     */
    public static void publishAll(String message) {
        WebSocketMessageContext broadcastMessage = new WebSocketMessageContext();
        broadcastMessage.setMessage(message);
        RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, broadcastMessage);
        log.info(" WebSocket 发送主题订阅消息 topic:{},message:{}", WebSocketConstant.WEB_SOCKET_TOPIC, message);
    }

    /**
     * 向指定的 WebSocket 会话发送 Pong 消息
     *
     * @param session 要发送 Pong 消息的 WebSocket 会话
     */
    public static void sendPongMessage(WebSocketSession session) {
        sendMessage(session, new PongMessage());
    }

    /**
     * 向指定的 WebSocket 会话发送文本消息
     *
     * @param session WebSocket 会话
     * @param message 要发送的文本消息内容
     */
    public static void sendMessage(WebSocketSession session, String message) {
        sendMessage(session, new TextMessage(message));
    }

    /**
     * 向指定的 WebSocket 会话发送 WebSocket 消息对象
     *
     * @param session WebSocket 会话
     * @param message 要发送的 WebSocket 消息对象
     */
    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
        if (Objects.isNull(session) || !session.isOpen()) {
            log.warn("[send] session 会话已经关闭");
        } else {
            try {
                session.sendMessage(message);
            } catch (IOException e) {
                log.error("[send] session:{},发送消息:{} 异常", session, message, e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

# 主题订阅监听器

WebSocket Redis 主题订阅监听器

@Slf4j
@RequiredArgsConstructor
public class WebSocketTopicListener implements MessageListener {

    private final RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        if (Objects.equals(channel, WebSocketConstant.WEB_SOCKET_TOPIC)) {
            // 反序列化消息体为 WebSocketMessageContext 对象
            WebSocketMessageContext context = (WebSocketMessageContext) redisTemplate.getValueSerializer().deserialize(message.getBody());

            if (Objects.isNull(context)) {
                log.info("WebSocket 主题订阅收到消息为空");
                return;
            }
            
            log.info("WebSocket 主题订阅收到消息 channel:{},Session Keys:{},message:{}", channel, context.getSessionKeys(), context.getMessage());

            // 如果 key 不为空就按照 key 发消息,如果为空就群发
            if (ListUtil.isNotEmpty(context.getSessionKeys())) {
                context.getSessionKeys().forEach(key -> {
                    if (WebSocketSessionManager.existSession(key)) {
                        WebSocketHelper.sendMessage(key, context.getMessage());
                    }
                });
            } else {
                WebSocketSessionManager.getSessionsAll().forEach(key -> {
                    WebSocketHelper.sendMessage(key, context.getMessage());
                });
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 容器装配

@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfiguration {

    @Bean
    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
        // 返回一个 WebSocketConfigurer 对象,用于配置 WebSocket
        return registry -> registry
                // 添加 WebSocket 处理程序和拦截器到指定路径,设置允许的跨域来源
                .addHandler(webSocketHandler, webSocketProperties.getPath())
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins(webSocketProperties.getAllowedOrigins());
    }

    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new WebSocketInterceptor();
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new WebSocketHandler();
    }

    @Bean
    public WebSocketTopicListener webSocketTopicListener(RedisTemplate<String, Object> redisTemplate) {
        return new WebSocketTopicListener(redisTemplate);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Spring Boot 3.x 需要在 resource 下建立 META-INF/spring 路径,然后创建 org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,内容为

cn.youngkbt.websocket.config.WebSocketConfiguration
1

这样 Spring 会自动扫描该文件的容器装配类,将里面涉及的类注入到 Spring 容器。

# 示例

@RestController
@RequestMapping("/demo/websocket")
public class DemoWebSocketController {

    /**
     * 发布消息
     *
     */
    @GetMapping("/send")
    public Response<String> send(WebSocketMessageContext webSocketMessageContext) {
        WebSocketHelper.publishMessage(webSocketMessageContext);
        return HttpResult.ok("操作成功");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
编辑此页 (opens new window)
#knowledge
更新时间: 2024/06/15, 16:39:27
知识 - 数据脱敏
知识 - Spring Cache

← 知识 - 数据脱敏 知识 - Spring Cache→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 最佳实践
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式