Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • MyBatis

  • MyBatis-Plus

  • 中间件 - ActiveMQ

    • ActiveMQ - 概述
    • ActiveMQ - 安装
    • ActiveMQ - 队列
    • ActiveMQ - 主题
    • ActiveMQ - JMS规范
    • ActiveMQ - 自带持久化
      • 什么是持久化消息
      • Queue 消息非持久化和持久化
      • Topic 消息持久化
      • JMS 的发布订阅总结
    • ActiveMQ - 事务性
    • ActiveMQ - Spring框架集成
    • ActiveMQ - 传输协议
    • ActiveMQ - 外部持久化
    • ActiveMQ - 高级特性
  • 中间件 - RabbitMQ

  • 中间件 - RocketMQ

  • 中间件 - Kafka

  • 高性能服务器 - Nginx

  • 响应式框架 - Reactor

  • 框架
  • 中间件 - ActiveMQ
Young Kbt
2022-07-30
目录

ActiveMQ - 自带持久化

  • 什么是持久化消息
  • Queue 消息非持久化和持久化
  • Topic 消息持久化
  • JMS 的发布订阅总结

# 什么是持久化消息

保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

在消息生产者将消息成功发送给 MQ 消息中间件之后。无论是出现任何问题,如:MQ 服务器宕机、消费者掉线等。都保证(Topic 要之前注册过,Queue 不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。

# Queue 消息非持久化和持久化

Queue 非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。

Queue持久化,当服务器宕机,消息依然存在。Queue 消息默认是持久化的。

持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

image-20220730150634078

非持久化 生产者演示:具体看 13-14 行代码

当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者就收不到该消息了













 
 












public class JmsProduce {
    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

持久化 生产者演示:具体看 13-14 行代码

当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者仍然能够收到该消息(默认)













 
 












public class JmsProduce {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        // 关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# Topic 消息持久化

Topic 默认是 非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

Topic 消息持久化,只要消费者向 MQ 服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是 MQ 服务器宕机还是消费者不在线。

注意

  • 一定要先运行一次消费者,等于向 MQ 注册,类似我订阅了这个主题
  • 然后再运行生产者发送消息。之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来

生产者演示:具体看 13 - 16 行代码(把 start 换到持久化后面,代表持久化后再启动 ActiveMQ)













 
 
 
 












public class JmsProduceTopic {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String TOPIC_NAME = "topic001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);

        // 设置持久化 Topic 
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化 Topic 之后再启动连接
        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

消费者演示:具体看 11、13 - 17 行代码(先订阅 Topic 后再 start)











 

 
 
 
 
 


















public class JmsConsumerTopic {

    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认61616)
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String TOPIC_NAME = "topic001";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 设置客户端 ID。向 MQ 服务器注册自己的名称
        connection.setClientID("marry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 创建一个 Topic 订阅者对象。一参是 Topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "mark...");
        // 之后再开启连接
        connection.start();

        topicSubscriber.setMessageListener(message -> {
            if(null != message  && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        topicSubscriber.close();
        session.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

看 ActiveMQ 客户端,Topic 页面还是和之前的一样。另外在 subscribers 页面也会显示。如下:

image-20220730151008557

# JMS 的发布订阅总结

JMS 的发布订阅总结

JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作 Topic。

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

主题使得消息订阅者和消息发布者保持互相独立,不需要解除即可保证消息的传送。

非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和 MQ 保持连接状态才能收发到某个主题的消息。

如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

持久订阅

客户端首先向 MQ 注册一个自己的身份 ID 识别号,当这个客户端处于离线时,生产者会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 MQ 的时候,会根据消费者的 ID 得到所有当自己处于离线时发送到主题的消息

非持久订阅状态下,不能恢复或重新派送一个未签收的消息。

持久订阅才能恢复或重新派送一个未签收的消息。

非持久和持久化订阅如何选择

当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。

编辑此页 (opens new window)
#中间件
更新时间: 2023/10/23, 10:58:52
ActiveMQ - JMS规范
ActiveMQ - 事务性

← ActiveMQ - JMS规范 ActiveMQ - 事务性→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 最佳实践
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式