RocketMQ - SpringBoot
# 集成 SpringBoot 基础
Springboot 提供了快捷操作 RocketMQ 的 RocketMQTemplate 对象。
# 配置
注意依赖的版本需要和 RocketMQ 的版本相同。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2
3
4
5
# 生产者
编写 application.yml 配置文件:
server:
port: 9876
rocketmq:
name-server: 192.168.199.32:9876 # rocketMq的nameServer地址
producer:
group: my-boot-producer-group # 生产者组别
send-message-timeout: 3000 # 消息发送的超时时间
retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数
max-message-size: 4194304 # 消息的最大长度
2
3
4
5
6
7
8
9
10
生产者代码
@Component
public class BaseProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, Message<String> message) {
// convertAndSend、syncSend、send 三个方法功能都一样,同步发送,使用一个即可
rocketMQTemplate.convertAndSend(topic, message);
}
}
2
3
4
5
6
7
8
9
10
11
单元格测试
@SpringBootTest
public class TestProducer {
@Resource
private BaseProducer baseProducer;
@Test
void sendMessage() {
Message<String> msg = MessageBuilder.withPayload("Hello RocketMQ!").build();
baseProducer.sendMessage("my-boot-topic", msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
# 消费者
编写 application.yml 配置文件:
server:
port: 9876
rocketmq:
name-server: 192.168.199.32:9876 # rocketMq的nameServer地址
2
3
4
5
消费者代码
/**
* 创建一个简单消息的监听
* 1. 类上添加注解 @Component 和 @RocketMQMessageListener
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消费的主题,consumerGroup 指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
* 2. 实现 RocketMQListener 接口,注意泛型的使用,可以为具体的类型,如果想拿到消息
* 的其他参数可以写成 MessageExt
*/
@Component
@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 发送对象消息和集合消息
我们接着在上面项目里面做
# 发送对象消息
生产者
主要是监听的时候泛型中写对象的类型即可。
添加一个 Order 类
/**
* 订单对象
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 订单号
*/
private String orderId;
/**
* 订单名称
*/
private String orderName;
/**
* 订单价格
*/
private Double price;
/**
* 订单号创建时间
*/
private Date createTime;
/**
* 订单描述
*/
private String desc;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
生产者
@Test
public void testObjectMsg() throws Exception {
Order order = new Order();
order.setOrderId(UUID.randomUUID().toString());
order.setOrderName("我的订单");
order.setPrice(998D);
order.setCreateTime(new Date());
order.setDesc("加急配送");
// 往 powernode-obj 主题发送一个订单对象
rocketMQTemplate.syncSend("powernode-obj", order);
}
2
3
4
5
6
7
8
9
10
11
消费者
使用上面的 Order 类
package com.powernode.listener;
import com.powernode.domain.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 创建一个对象消息的监听
* 1. 类上添加注解 @Component 和 @RocketMQMessageListener
* 2. 实现 RocketMQListener 接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode-obj", consumerGroup = "powernode-obj-group")
public class ObjMsgListener implements RocketMQListener<Order> {
/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(Order message) {
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 发送集合消息
和对象消息同理,创建一个Order的集合,发送出去,监听方注意修改泛型中的类型为Object即可,这里就不做重复演示了。
# 集成 SpringBoot 发送不同消息模式
# 发送同步消息
理解为:消息由消费者发送到 Broker 后,会得到一个确认,是具有可靠性的。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。
我们在上面的快速入门中演示的消息,就是同步消息,即
rocketMQTemplate.syncSend()
rocketMQTemplate.send()
rocketMQTemplate.convertAndSend()
2
3
4
5
这三种发送消息的方法,底层都是调用 syncSend,发送的是同步消息。
# 发送异步消息
rocketMQTemplate.asyncSend()
生产者
/**
* 测试异步发送消息
*/
@Test
public void testAsyncSend() throws Exception {
// 发送异步消息,发送完以后会有一个异步通知
rocketMQTemplate.asyncSend("powernode", "发送一个异步消息", new SendCallback() {
/**
* 成功的回调
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
/**
* 失败的回调
* @param throwable
*/
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
// 测试一下异步的效果
System.out.println("谁先执行");
// 挂起 jvm 不让方法结束
System.in.read();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 发送单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送。
/**
* 测试单向消息
*/
@Test
public void testOnWay() throws Exception {
// 发送单向消息,没有返回值和结果
rocketMQTemplate.sendOneWay("powernode", "这是一个单向消息");
}
2
3
4
5
6
7
8
# 发送延迟消息
/**
* 测试延迟消息
*/
@Test
public void testDelay() throws Exception {
// 构建消息对象
Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();
// 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
SendResult sendResult = rocketMQTemplate.syncSend("powernode", message, 2000, 4);
System.out.println(sendResult.getSendStatus());
}
2
3
4
5
6
7
8
9
10
11
运行后,查看消费者端,过了 30s 才被消费。
这里注意的是 RocketMQ 不支持任意时间的延时。
只支持以下几个固定的延时等级,等级 1 就对应 1s,以此类推,最高支持 2h 延迟
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
# 发送顺序消息
修改 Order 表添加一个顺序字段
/**
* 订单的流程顺序
*/
private Integer seq;
2
3
4
生产者
/**
* 测试顺序消费
* mq会根据hash的值来存放到一个队列里面去
*/
@Test
public void testOrderly() throws Exception {
List<Order> orders = Arrays.asList(
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的下订单", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的发短信", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的物流", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "张三的签收", null, null, null, 1),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的下订单", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的发短信", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的物流", null, null, null, 2),
new Order(UUID.randomUUID().toString().substring(0, 5), "李四的签收", null, null, null, 2)
);
// 我们控制流程为 下订单->发短信->物流->签收 hash的值为seq,也就是说 seq相同的会放在同一个队列里面,顺序消费
orders.forEach(order -> {
rocketMQTemplate.syncSendOrderly("powernode-obj", order, String.valueOf(order.getSeq()));
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
消费者
package com.powernode.listener;
import com.powernode.domain.Order;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 创建一个对象消息的监听
* 1. 类上添加注解 @Component 和 @RocketMQMessageListener
* 2. 实现 RocketMQListener 接口,注意泛型的使用
* consumeMode 指定消费类型
* CONCURRENTLY 并发消费
* ORDERLY 顺序消费 messages orderly. one queue, one thread
*/
@Component
@RocketMQMessageListener(topic = "powernode-obj",
consumerGroup = "powernode-obj-group",
consumeMode = ConsumeMode.ORDERLY
)
public class ObjMsgListener implements RocketMQListener<Order> {
/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(Order message) {
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 发送事务消息
生产者:
@Component
public class BaseProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(msg).build();
// topic 和 tag 整合在一起,sendMessageInTransaction 的参数默认是 : 分开 topic 和 tag
String destination = topic + ":" + tags[i % tags.length];
// 第一个 destination 是消息要发送的目的地 topic,第二个 destination 消息携带的业务数据
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.println(sendResult);
Thread.sleep(10);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
编写事务监听器类
@RocketMQTransactionListener()
public class MyTransactionListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String destination = (String) arg;
// 把 spring 的 message 转换成 Rocketmq 的 message
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(
new StringMessageConverter(),
"utf-8",
destination,
msg
);
// 得到 message 上的 tag 的内容
String tags = message.getTags();
if (StringUtils.contains(tags, "TagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("checkLocalTransaction" + msg);
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- 消息会先到事务监听类的执行方法,
- 如果返回状态为 COMMIT,则消费者可以直接监听到
- 如果返回状态为 ROLLBACK,则消息发送失败,直接回滚
- 如果返回状态为 UNKNOW,则过一会会走回查方法
- 如果回查方法返回状态为 UNKNOW 或者 ROLLBACK,则消息发送失败,直接回滚
- 如果回查方法返回状态为 COMMIT,则消费者可以直接监听到
# 集成 SpringBoot 的消息过滤
# tag 过滤(常在消费者端过滤)
我们从源码注释得知,tag 带在主题后面用:来携带
我们往下去看源码,在org.apache.rocketmq.spring.support.RocketMQUtil
的 getAndWrapMessage
方法里面看到了具体细节,我们也知道了 keys 在消息头里面携带。
生产者
/**
* 发送一个带 tag 的消息
*/
@Test
public void testTagMsg() throws Exception {
// 发送一 个tag 为 java 的数据
rocketMQTemplate.syncSend("powernode-tag:java", "我是一个带tag的消息");
}
2
3
4
5
6
7
8
消费者
package com.powernode.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 创建一个简单的标签消息的监听
* 1. 类上添加注解 @Component 和 @RocketMQMessageListener
* selectorType = SelectorType.TAG, 指定使用tag过滤。(也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFilter=true)
* selectorExpression = "java" 表达式,默认是*,支持"tag1 || tag2 || tag3"
* 2. 实现 RocketMQListener 接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode-tag",
consumerGroup = "powernode-tag-group",
selectorType = SelectorType.TAG,
selectorExpression = "java"
)
public class TagMsgListener implements RocketMQListener<String> {
/**
* 消费消息的方法
*/
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Key 过滤(可以在事务监听的类里面区分)
生产者
/**
* 发送一个带 key 的消息,我们使用事务消息 打断点查看消息头
*/
@Test
public void testKeyMsg() throws Exception {
// 发送一个key为spring的事务消息
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, "spring")
.build();
rocketMQTemplate.sendMessageInTransaction("powernode", message, "我是一个带key的消息");
}
2
3
4
5
6
7
8
9
10
11
断点发送这个消息,查看事务里面消息头
# 集成 SpringBoot 消息消费两种模式
RocketMQ 消息消费的模式分为两种:负载均衡模式和广播模式
- 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
- 广播模式表示每个每个消费者都消费一遍订阅的主题的消息
# 负载均衡模式
消费者 1
/**
* messageModel 指定消息消费的模式
* CLUSTERING 为负载均衡模式
* BROADCASTING 为广播模式
*/
@Component
@RocketMQMessageListener(topic = "powernode",
consumerGroup = "powernode-group",
messageModel = MessageModel.CLUSTERING
)
public class ConsumerBListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
消费者 2
/**
* 创建一个简单消息的监听
* 1. 类上添加注解 @Component 和@ RocketMQMessageListener
*
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic 指定消费的主题,consumerGroup 指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
* 2. 实现 RocketMQListener 接口,注意泛型的使用
*/
@Component
@RocketMQMessageListener(topic = "powernode",
consumerGroup = "powernode-group",
messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(new Date());
System.out.println(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
生产者
/**
* 测试消息消费的模式
*/
@Test
public void testMsgModel() throws Exception {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.syncSend("powernode", "我是消息" + i);
}
}
2
3
4
5
6
7
8
9
查看两个消费者的控制台,发现是负载均衡的模式,两个消费者分别均摊部分的消息,,不会出现一个消息两个消费者c'f消费。
# 广播模式
只需要将 MessageModel.CLUSTERING
改成 MessageModel.BROADCASTING
即可变为广播模式。
修改后重启测试,结果是广播模式:每个消费者都消费了这些消息。