RocketMQ - 集群核心概念
# 负载均衡
RocketMQ 中的负载均衡都在 Client 端完成,具体来说的话,主要可以分为Producer 端发送消息时候的负载均衡和 Consumer 端订阅消息的负载均衡。
# Producer 的负载均衡
Producer 端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认⽅式下 selectOneMessageQueue()
⽅法会从 TopicPublishInfo 中的 messageQueueList 中选择⼀ 个队列(MessageQueue)进⾏发送消息。具体的容错策略均在 MQFaultStrategy 这个类中定义。这⾥有⼀个 sendLatencyFaultEnable
开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。所谓的 latencyFaultTolerance
,是指对之前失败的,按⼀定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 3000Lms;超过 1000L,就退避 60000L;如果关闭,采⽤随机递增取模的⽅式选择⼀个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送⾼可⽤的核⼼关键所在。
# Consumer的负载均衡
在 RocketMQ 中,Consumer 端的两种消费模式 (Push/Pull) 都是基于拉模式来获取消息的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又「马不停蹄」的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式 (Push/Pull) 中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列-队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。
AllocateMessageQueueStrategy
接口的实现类表达了不同的负载均衡策略:
consumer.setAllocateMessageQueueStrategy(newAllocateMessageQueueAveragelyByCircle());
AllocateMessageQueueStrategy
接⼝的实现类表达了不同的负载均衡策略:
AllocateMachineRoomNearby:基于机房近侧优先级的代理分配策略。可以指定实际的分配策略。如果任何使用者在机房中活动,则部署在同一台机器中的代理的消息队列应仅分配给这些使用者。否则,这些消息队列可以与所有消费者共享,因为没有活着的消费者可以垄断它们
AllocateMessageQueueAveragely:平均哈希队列算法
AllocateMessageQueueAveragelyByCircle:循环平均哈希队列算法
AllocateMessageQueueByConfig:不分配,通过指定 MessageQueue 列表来消费
AllocateMessageQueueByMachineRoom:机房哈希队列算法,如支付宝逻辑机房
AllocateMessageQueueConsistentHash:一致哈希队列算法,带有虚拟节点的一致性哈希环,就是一个圆圈,每个位置都有一个节点,那比如 12 点到 3 点为 A MessageQueue,3 点到 6 点 为 B MessageQueue,依次类推,那么计算哈希值得到区间,就可以放到对应的 MessageQueue 里
其实环点会有很多的节点,但是可能某个节点挂掉了,于是就会有虚拟机点,比如 9 - 12 就有一个虚拟节点,它映射 3 - 6 的真实节点,也就是落到了 9 - 12 点位置,则使用 3 - 6 的队列
注意,在 MessageQueue 和C onsumer 之间一旦发生对应关系的改变,就会触发 rebalance,进行重新分配。
# 消息重试
# 生产者重试
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
2
3
4
# 消费者重试
非广播模式下,Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理 (例如话费充值,当前消息的手机号被注销,无法充值) 等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99% 也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试
- 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 Sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。
在代码层面,如果消费者返回的是以下三种情况,则消息会重试消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到的消息:"+msg);
}
return null;
// return
ConsumeConcurrentlyStatus.RECONSUME_LATER;
// 抛出异常
}
});
2
3
4
5
6
7
8
9
10
11
12
消费者返回 null,或者返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,或者抛出异常,都会触发重试。
我们在实际生产过程中,一般重试 5-7 次,如果还没有消费成功,则可以把消息签收了,通知人工等处理。
@Test
public void testConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr("192.168.199.32:9876:9876");
// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
// 这里执行消费的代码
System.out.println(Thread.currentThread().getName() + "----" + msgs);
// 这里制造一个错误
int i = 10 / 0;
} catch (Exception e) {
// 出现问题 判断重试的次数
MessageExt messageExt = msgs.get(0);
// 获取重试的次数 失败一次消息中的失败次数会累加一次
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 则把消息确认了,可以将这条消息记录到日志或者数据库 通知人工处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 关于重试次数
RocketMQ 会为每个消费组都设置一个 Topic 名称为 %RETRY%+consumerGroup
的重试队列 (这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为 SCHEDULE TOPIC XXXX
的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至 %RETRY%+consumerGroup
的重试队列中。
与延迟队列的设置相同,消息默认会重试 16 次,每次重试的时间间隔如下
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重试超过指定次数的消息,将会进入到死信队列中 %DLQ%my-consumer-group1
。
# 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息 (Dead-LetterMessage) ,将存储死信消息的特殊队列称为死信队列 (Dead-Letter Queue) 。在RocketMQ 中,可以通过使用 Console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
死信队列具备以下特点:
- RocketMQ 会自动为需要死信队列的 ConsumerGroup 创建死信队列
- 死信队列与 ConsumerGroup 对应,死信队列中包含该 ConsumerGroup 所有相关 Topic 的死信消息
- 死信队列中消息的有效期与正常消息相同,默认 48 小时
- 若要消费死信队列中的消息,需在控制台将死信队列的权限设置为6,即可读可
当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列。
消息生产者
@Test
public void testDeadMsgProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("dead-group");
producer.setNamesrvAddr("192.168.199.32:9876:9876");
producer.start();
Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
producer.send(message);
producer.shutdown();
}
2
3
4
5
6
7
8
9
消息消费者
@Test
public void testDeadMsgConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("192.168.199.32:9876:9876");
consumer.subscribe("dead-topic", "*");
// 设置最大消费重试次数 2 次
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 测试消费失败
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 死信消费者
注意权限问题 perm:2 读、4 写、6 读写。
@Test
public void testDeadMq() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
consumer.setNamesrvAddr("192.168.199.32:9876:9876");
// 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
// 队列名称 默认是 %DLQ% + 消费者组名
consumer.subscribe("%DLQ%dead-group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs);
// 处理消息 签收了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 控制台显示
# 幂等消息
幂等性: 多次操作造成的结果是一致的。对于非幂等的操作,幂等性如何保证?
在请求方式中的幂等性的体现
get:多次 get 结果是一致的
post:添加,非幂等
put:修改: 幂等,根据 id 修改
delete:根据 id 删除,幂等
对于非幂等的请求,我们在业务里要做幂等性保证。
在消息队列中的幂等性体现
消息队列中,很可能一条消息被冗余部署的多个消费者收到,对于非幂等的操作,就需要做幂等性保证,否则消息将会被重复消费。可以将情况概比如用户的注册,括为以下几种:
- 生产者重复发送: 由于网络抖动,导致生产者没有收到 Broker 的 ack 而再次重发消息,实际上 Broker 收到了多条重复的消息,造成消息重复
- 消费者重复消费: 由于网络抖动,消费者没有返回 ack 给 broker,导致消费者重试消费
- rebalance 时的重复消费: 由于网络抖动,在 rebalance 重分配时也可能出现消费者重复消费某条消息
如何保证幂等性消费
mysql 插入业务 id 作为主键,主键是唯一的,所以一次只能插入一条使用 redis 或 zk 的分布式锁 (主流的方案)
# 消息重复消费问题
# 为什么会出现重复消费问题呢?
BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费。
即使是在 CLUSTERING 模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 Broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交 offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。
还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。
那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了。
# 解决方案
使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?
我们可以选择布隆过滤器 BloomFilter。
布隆过滤器(Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。
在 hutool 的工具中我们可以直接使用,当然你自己使用 Redis 的 bitmap 类型手写一个也是可以的。
# 测试生产者
@Test
public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("192.168.199.32:9876:9876");
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 添加 hutool 的依赖
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.11</version>
</dependency>
2
3
4
5
# 测试消费者
/**
* 在 SpringBoot 项目中可以使用 @Bean 在整个容器中放置一个单利对象
*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);
@Test
public void testRepeatConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置nameServer地址
consumer.setNamesrvAddr("192.168.199.32:9876:9876");
// 订阅一个主题来消费 表达式,默认是*
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 拿到消息的key
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
// 判断是否存在布隆过滤器中
if (bloomFilter.contains(keys)) {
// 直接返回了 不往下处理业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 这个处理业务,然后放入过滤器中
// do sth...
bloomFilter.add(keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37