ActiveMQ - 队列
# JMS编码总体规范
JMS 就是 Java 消息服务(Java Message Service),是一套设计规范。
JMS 的开发步骤:
- 创建一个 Connection Factory
- 通过 Connection Factory 来创建 JMS Connection
- 启动 JMS Connection
- 通过 Connection 创建 JMS Session
- 创建 JMS Destination
- 创建 JMS Producer 或者创建 JMS Message 并设置 Destination
- 创建 JMS Consumer或者是注册一个 JMS Message Listener
- 发送或者接受 JMS Message(s)
- 关闭所有的 JMS 资源(Connection,Session,Producer,Consumer 等)
# Destination简介
Destination 是目的地。下面拿 JVM 和 MQ,做个对比。目的地,我们可以理解为是数据存储的地方。
Destination 分为两种:队列和主题。下图介绍:
# Queue队列入门
消息队列里由两个地方存储数据,一个是队列(queue),另一个是主题(topic)。
- 队列:一对一,类似于微信私聊,针对个人发布新消息
- 主题:一对多,类似于微信公众号,发布新消息,所有订阅的的人都获得消息
# 入门案例
# 队列消息生产者
生产者生产消息,然后将消息放到队列里。
package com.eight;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author Kele-Bing
* @Create 2021/10/18 16:07:50
* @Describe JmsProduce 发布者
*/
public class JmsProduce {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认61616)
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String QUEUE_NAME = "queue001";
public static void main(String[] args) throws JMSException {
// 1.按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通过连接工厂,获得连接 connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话 session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(两种 :队列/主题)。Destination 是 Queue 和 Topic 的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
// 6.通过 messageProducer 生产 3 条 消息发送到消息队列中
for (int i = 1; i <= 3; i++) {
// 7.创建消息
TextMessage textMessage = session.createTextMessage("msg --- " + i);
// 8.通过 messageProducer 发送给 mq
producer.send(textMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
然后运行。
# ActiveMQ客户端
访问 ActiveMQ 管理页面地址:http://IP地址:8161/
,账户 admin 密码 admin。利用客户端查看上面生产者生产的消息。
- Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数
- Number Of Consumers:消费者数量,消费者端的消费者数量
- Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减
- Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量
总结
当有 3 个消息进入这个队列时,等待消费的消息是 3,进入队列的消息是 3。
当消息消费 3 条消息后,等待消费的消息是 0,进入队列的消息依旧是 3,出队列的消息是 3。
当再来一条消息时,等待消费的消息是 1,进入队列的消息就是 4。
# 同步顺序式消费者
编写消费者从队列中拿到消息来消费,这里是同步消费,也就是一直在当前线程阻塞式获取消息,只有收到消息才能往下执行。
package com.eight;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author Kele-Bing
* @Create 2021/10/18 16:34:31
* @Describe JmsConsumer
*/
public class JmsConsumer {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号(默认61616)
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String QUEUE_NAME = "queue001";
public static void main(String[] args) throws JMSException {
// 1.按照给定的 url 创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通过连接工厂,获得连接 connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话 session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(两种 :队列/主题)。Destination 是 Queue 和 Topi c的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
// reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和 Socket 的 accept 方法类似的。
// reveive(Long time) : 等待 n 毫秒之后还没有收到消息,就是结束阻塞。
// 因为消息发送者是 TextMessage,所以消息接受者也要是 TextMessage
TextMessage textMessage = (TextMessage) consumer.receive();
if(null != textMessage) {
System.out.println("消费的消息:" + textMessage.getText());
}else {
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
再重新看看 ActiveMQ 客户端:
Messages Dequeued 出队消息数为 3,说明 3 个消息都被拿出来了。
# 异步监听式消费者
异步消费采用监听的方式,也就是单独开一个监听器来监听是否有消息放入队列,这样不影响后续代码的执行。
package com.eight;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @Author Kele-Bing
* @Create 2021/10/18 16:34:31
* @Describe JmsConsumer
*/
public class JmsConsumer {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认61616)
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String QUEUE_NAME = "queue001";
public static void main(String[] args) throws JMSException, IOException {
// 1.按照给定的 url 创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通过连接工厂,获得连接 connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话 session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(两种 :队列/主题)。Destination 是 Queue 和 Topic 的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5.创建消息的消费者
MessageConsumer consumer = session.createConsumer(queue);
/* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
通过 messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用 MessageListener 的 onMessage 方法处理消息
*/
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
// 实际开发中,我们的程序会一直运行,这句代码都会省略
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 两种消费区别
同步阻塞方式(receive)
订阅者或接收者抵用 MessageConsumer 的 receive()
方法来接收消息,receive 方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器 onMessage)
订阅者或接收者通过 MessageConsumer 的 setMessageListener(MessageListenerlistener)
注册一个消息监听器,当消息到达之后,系统会自动调用监听器 MessageListener 的 onMessage(Messagemessage)
方法。
多个队列时,订阅者如何获取想要的消息?
在调用 createQueue 方法时,要传入获取的队列名,这样才能从这个队列拿到消息。
# 队列特点
点对点消息传递域的特点如下:
- 每个消息只能有一个消费者,类似 1 对 1 的关系。好比个人快递自己领取自己的
- 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看
- 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息
# 消费信息情况
情况 1:只启动消费者 1
结果:消费者 1 会消费所有的数据。
情况 2:先启动消费 者1,再启动消费者 2
结果:消费者 1 消费所有的数据。消费者 2 不会消费到消息。
情况 3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2
结果:消费者 1 和消费者 2 平摊了消息。各自消费 3 条消息。