ActiveMQ - 自带持久化
# 什么是持久化消息
保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
在消息生产者将消息成功发送给 MQ 消息中间件之后。无论是出现任何问题,如:MQ 服务器宕机、消费者掉线等。都保证(Topic 要之前注册过,Queue 不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
# Queue 消息非持久化和持久化
Queue 非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。
Queue持久化,当服务器宕机,消息依然存在。Queue 消息默认是持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
非持久化 生产者演示:具体看 13-14 行代码
当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者就收不到该消息了
public class JmsProduce {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String QUEUE_NAME = "queue001";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
// 非持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg --- " + i);
producer.send(textMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
持久化 生产者演示:具体看 13-14 行代码
当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者仍然能够收到该消息(默认)
public class JmsProduce {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String QUEUE_NAME = "queue001";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
// 非持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg --- " + i);
producer.send(textMessage);
}
// 关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Topic 消息持久化
Topic 默认是 非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
Topic 消息持久化,只要消费者向 MQ 服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是 MQ 服务器宕机还是消费者不在线。
注意
- 一定要先运行一次消费者,等于向 MQ 注册,类似我订阅了这个主题
- 然后再运行生产者发送消息。之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来
生产者演示:具体看 13 - 16 行代码(把 start 换到持久化后面,代表持久化后再启动 ActiveMQ)
public class JmsProduceTopic {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
// 设置持久化 Topic
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 设置持久化 Topic 之后再启动连接
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg --- " + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
消费者演示:具体看 11、13 - 17 行代码(先订阅 Topic 后再 start)
public class JmsConsumerTopic {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认61616)
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
// 设置客户端 ID。向 MQ 服务器注册自己的名称
connection.setClientID("marry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个 Topic 订阅者对象。一参是 Topic,二参是订阅者名称
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "mark...");
// 之后再开启连接
connection.start();
topicSubscriber.setMessageListener(message -> {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
topicSubscriber.close();
session.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
看 ActiveMQ 客户端,Topic 页面还是和之前的一样。另外在 subscribers 页面也会显示。如下:
# JMS 的发布订阅总结
JMS 的发布订阅总结
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作 Topic。
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。
主题使得消息订阅者和消息发布者保持互相独立,不需要解除即可保证消息的传送。
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和 MQ 保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
持久订阅
客户端首先向 MQ 注册一个自己的身份 ID 识别号,当这个客户端处于离线时,生产者会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 MQ 的时候,会根据消费者的 ID 得到所有当自己处于离线时发送到主题的消息
非持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息。
非持久和持久化订阅如何选择
当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。