RocketMQ 底层原理之高可用机制
内容概述
# RocketMQ 底层原理之高可用机制
- [RocketMQ 底层原理之高可用机制](#RocketMQ 底层原理之高可用机制)
# [1] RocketMQ 中的高可用机制

RocketMQ 分布式集群是通过Master 和Slave 的配合达到高可用性的。
Master 和Slave 的区别:在Broker 的配置文件中,参数brokerId 的值为0 表明这个Broker 是Master,大于0 表明这个Broker
是Slave,
同时brokerRole 参数也会说明这个Broker 是Master 还是Slave。
Master 角色的Broker 支持读和写,Slave 角色的Broker 仅支持读,也就是Producer 只能和Master 角色的Broker 连接写入消息;
Consumer 可以连接Master 角色的Broker,也可以连接Slave 角色的Broker 来读取消息。
# [1.1] 集群部署模式
# 单master 模式
也就是只有一个master 节点,称不上是集群,一旦这个master 节点宕机,那么整个服务就不可用。
# 多master 模式
多个master 节点组成集群,单个master 节点宕机或者重启对应用没有影响。
优点: 所有模式中性能最高(一个Topic 的可以分布在不同的master,进行横向拓展)
在多主多从的架构体系下,无论使用客户端还是管理界面创建主题,一个主题都会创建多份队列在多主中(默认是4
个的话,
双主就会有8 个队列,每台主4 个队列,所以双主可以提高性能,一个Topic 的分布在不同的master,方便进行横向拓展。
缺点: 单个master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
# 多master 多slave 异步复制模式
而从节点(Slave)就是复制主节点的数据,对于生产者完全感知不到,对于消费者正常情况下也感知不到。
(只有当Master 不可用或者繁忙的时候,Consumer 会被自动切换到从Slave 读。)
在多master 模式的基础上,每个master 节点都有至少一个对应的slave。master 节点可读可写,但是slave 只能读不能写,类似于mysql
的主备模式。
优点: 一般情况下都是master 消费,在master 宕机或超过负载时,消费者可以从slave 读取消息,消息的实时性不会受影响,
性能几乎和多master一样。
缺点: 使用异步复制的同步方式有可能会有消息丢失的问题。(Master 宕机后,生产者发送的消息没有消费完,
同时到Slave 节点的数据也没有同步完)
# 多master 多slave 主从同步复制+异步刷盘(最优推荐)
优点: 主从同步复制模式能保证数据不丢失。
缺点: 发送单个消息响应时间会略长,性能相比异步复制低10%左右。
对数据要求较高的场景,主从同步复制方式,保存数据热备份,通过异步刷盘方式,保证rocketMQ
高吞吐量。
# Dlegder(不推荐)
在RocketMQ4.5 版本之后推出了Dlegder 模式,但是这种模式一直存在严重的BUG,同时性能有可能有问题,包括升级到了4.8
的版本后也一样,
所以目前不谈这种模式。(原理类似于Zookeeper 的集群选举模式)
# [1.2] 刷盘与主从同步
生产时首先将消息写入到MappedFile,内存映射文件,然后根据刷盘策略刷写到磁盘。
大致的步骤可以理解成使用MMAP 中的MappedByteBuffer 中实际用flip()。
RocketMQ 的刷盘是把消息存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ
为了提高性能,
会尽可 能地保证磁盘的顺序写。消息在通过Producer 写入RocketMQ 的时候,有两种写磁盘方式,同步刷盘和异步刷盘。
# 同步刷盘
SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问题,但是有很大的磁盘IO 开销, 性能有一定影响。
# 异步刷盘
ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。 随后再异步的将缓存数据保存到磁盘,有两种情况:
- 是定期将缓存中更新的数据进行刷盘;
- 是当缓存中更新的数据条数达到某一设定值后进行刷盘。
这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。
4.8.0 版本中默认值下是异步刷盘,如下图:

# 主从同步复制
集群环境下需要部署多个Broker,Broker 分为两种角色:
- 一种是master,即可以写也可以读,其brokerId=0,只能有一个;
- 另外一种是slave,只允许读,其brokerId 为非0。
一个master 与多个slave 通过指定相同的brokerClusterName 被归为一个broker set(broker 集)。通常生产环境中,我们至少需要2
个broker set。
Slave 是复制master 的数据。一个Broker 组有Master 和Slave,消息需要从Master 复制到Slave 上,有同步和异步两种复制方式。
主从同步复制方式(Sync Broker):生产者发送的每一条消息都至少同步复制到一个slave
后才返回告诉生产者成功,即“同步双写”。
在同步复制方式下,如果Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
# 主从异步复制
主从异步复制方式(Async Broker):生产者发送的每一条消息只要写入master 就返回告诉生产者成功。然后再“异步复制”到slave。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master
出了故障,有些数据因为没有被写入Slave,有可能会丢失;
同步复制和异步复制是通过Broker 配置文件里的brokerRole 参数进行设置的,
这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。
# [1.3] 配置参数及意义
- brokerId=0 代表主
- brokerId=1 代表从(大于0 都代表从)
- brokerRole=SYNC_MASTER 同步复制(主从)
- brokerRole=ASYNC_MASTER 异步复制(主从)
- flushDiskType=SYNC_FLUSH 同步刷盘
- flushDiskType=ASYNC_FLUSH 异步刷盘
# [1.4] 搭建双主双从同步复制+异步刷盘
# NameServer 集群
两台服务器:
- 106.55.246.66
- 94.191.83.120
# Broker 服务器
四台服务器(两主两从):
- 106.55.246.66 ------MasterA
- 94.191.83.120 ------MasterB
- 106.53.195.121 ------SlaveA
- 106.55.248.74 ------SlaveB
# 配置文件
注意,因为RocketMQ 使用外网地址,所以配置文件(MQ 文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver 地址为集群地址):
注意,如果机器内存不够,建议把启动时的堆内存改小,具体见《RocketMQ 的安装.docx》中--- 3、RocketMQ 在Linux
下的安装/注意事项:
- 106.55.246.66 ------主A(broker-a.properties 增加):
brokerIP1=106.55.246.66
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
2

- 94.191.83.120 ------主B(broker-b.properties 增加):
brokerIP1=94.191.83.120
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
2

- 106.53.195.121 ------从A(broker-a-s.properties 增加):
brokerIP1=106.53.195.121
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
2

- 106.55.248.74 ------从B(broker-b-s.properties 增加):
brokerIP1=106.55.248.74
namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
2

注意:不管是主还是从,如果属于一个集群,使用相同的brokerClusterName 名称。

# 启动步骤
# 启动NameServer
注意,记得关闭防火墙或者要开通9876 端口。
1、启动NameServer 集群,我们这里使用106.55.246.66 和94.191.83.120 两台作为集群即可。
1). 在机器A,启动第1 台NameServer: 102 服务器进入至‘MQ 文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’ 查看日志的命令:
tail -f ~/logs/rocketmqlogs/namesrv.log

2). 在机器B,启动第2 台NameServer: 103 服务器进入至‘MQ 文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’ 查看日志的命令:
tail -f ~/logs/rocketmqlogs/namesrv.log

# 启动Broker
2、启动双主双从同步集群,顺序是先启动主,然后启动从。
3). 启动主A: 102 服务器进入至‘MQ 文件夹/bin’下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &

4). 启动主B: 103 服务器进入至‘MQ 文件夹/bin’下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &
查看日志的命令:
tail -f ~/logs/rocketmqlogs/broker.log
5). 启动从A: 104 服务器进入至‘MQ 文件夹/bin’下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &
查看日志的命令:
tail -f ~/logs/rocketmqlogs/broker.log
6). 启动从B: 105 服务器进入至‘MQ 文件夹/bin’下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &
查看日志的命令:
tail -f ~/logs/rocketmqlogs/broker.log
每台服务器查看日志:
tail -f ~/logs/rocketmqlogs/broker.log
如果是要启动控制台,则需要重新打包:
进入‘/rocketmq-console/src/main/resources’文件夹,打开‘application.properties’进行配置。
(多个NameServer 使用;分隔)
rocketmq.config.namesrvAddr=106.55.246.66:9876;94.191.83.120:9876
进入‘/rocketmq-externals/rocketmq-console’文件夹,
执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。
在把编译后的jar 包丢上服务器:
nohup java -jar rocketmq-console-ng-2.0.0.jar &
进入控制台http://106.55.246.66:8089/#/cluster,就可以查看搭建成功的集群详情。

# [2] 消息生产的高可用机制

在创建Topic 的时候,把Topic 的多个Message Queue 创建在多个Broker 组上(相同Broker 名称,不同brokerId 的机器组成一个Broker
组),
这样当一个Broker 组的Master 不可用后,其他组的Master 仍然可用,Producer 仍然可以发送消息。
RocketMQ 目前不支持把Slave 自动转成Master,如果机器资源不足, 需要把Slave 转成Master,则要手动停止Slave
角色的Broker,
更改配置文件,用新的配置文件启动Broker。
# [2.1] 高可用消息生产流程

- TopicA 创建在双主中,BrokerA 和BrokerB 中,每一个Broker 中有4 个队列
- 选择队列是,默认是使用轮训的方式,比如发送一条消息A 时,选择BrokerA 中的Q4
- 如果发送成功,消息A 发结束。
- 如果消息发送失败,默认会采用重试机制
retryTimesWhenSendFailed 同步模式下内部尝试发送消息的最大次数默认值是2 retryTimesWhenSendAsyncFailed 异步模式下内部尝试发送消息的最大次数默认值是2

- 如果发生了消息发送失败,这里有一个规避策略(默认配置):
5.1、默认不启用Broker 故障延迟机制(规避策略):如果是BrokerA 宕机,上一次路由选择的是BrokerA 中的Q4, 那么再次重发的队列选择是BrokerA 中的Q1。但是这里的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
注意, 这里的规避仅仅只针对消息重试,例如在一次消息发送过程中如果遇到消息发送失败,规避broekr-a, 但是在下一次消息发送时,即再次调用DefaultMQProducer 的send 方法发送消息时,还是会选择broker-a 的消息进行发送, 只有继续发送失败后,重试时再次规避broker-a。
那么,rocketMQ为什么会默认这么设计?主要有以下两点原因:
1、某一时间段,从NameServer 中读到的路由中包含了不可用的主机
2、不正常的路由信息也是只是一个短暂的时间而已。
生产者每隔30s 更新一次路由信息,而NameServer 认为broker 不可用需要经过120s。

所以生产者要发送时认为broker 不正常(从NameServer 拿到)和实际Broker 不正常有延迟。5.2、启用Broker 故障延迟机制:
代码演示:

开启延迟规避机制,一旦消息发送失败(不是重试的)会将broker-a “悲观”地认为在接下来的一段时间内该Broker 不可用, 在为未来某一段时间内所有的客户端不会向该Broker 发送消息。这个延迟时间就是通过notAvailableDuration、latencyMax 共同计算的, 就首先先计算本次消息发送失败所耗的时延,然后对应latencyMax 中哪个区间,即计算在latencyMax 的下标, 然后返回notAvailableDuration 同一个下标对应的延迟值。
在发送失败后,在接下来的固定时间(比如5 分钟)内,发生错误的BrokeA 中的队列将不再参加队列负载,发送时只选择BrokerB 服务器上的队列。
注意,如果所有的Broker 都触发了故障规避,并且Broker 只是那一瞬间压力大,那岂不是明明存在可用的Broker,但经过你这样规避, 反倒是没有Broker 可用来,那岂不是更糟糕了。所以RocketMQ 默认不启用Broker 故障延迟机制。
# [3] 消息消费的高可用机制
# [3.1] 主从的高可用原理
在Consumer 的配置文件中,并不需要设置是从Master 读还是从Slave 读,当Master 不可用或者繁忙的时候,Consumer 会被自动切换到从Slave 读。 有了自动切换Consumer 这种机制,当一个Master 角色的机器出现故障后,Consumer 仍然可以从Slave 读取消息,不影响Consumer 程序。 这就达到了消费端的高可用性。
Master 不可用这个很容易理解,那什么是Master 繁忙呢?
这个繁忙其实是RocketMQ 服务器的内存不够导致的。
我们来看下源码org.apache.rocketmq.store. DefaultMessageStore#getMessage 方法

我们发现,当前需要拉取的消息已经超过常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。
# [3.2] 消息消费的重试
消费端如果发生消息失败,没有提交成功,消息默认情况下会进入重试队列中。

注意,重试队列的名字其实是跟消费群组有关,不是主题,因为一个主题可以有多个群组消费。 
# 顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ 会自动不断进行消息重试(每次间隔时间为1 秒),这时,应用会出现消息消费被阻塞的情况。
因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
所以玩顺序消息时,consume 消费消息失败时,不能返回reconsume——later,这样会导致乱序,应该返回suspend_current_queue_a_moment,
意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
# 无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对集群消费方式生效; 广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
# 重试次数
| 第几次重 | 与上次重试的间隔时 | 第几次重试 | 与上次重试的间隔时间 |
|---|---|---|---|
| 1 | 10秒 | 9 | 7分钟 |
| 2 | 30秒 | 10 | 8分钟 |
| 3 | 1分钟 | 11 | 9分钟 |
| 4 | 2分钟 | 12 | 10分钟 |
| 5 | 3分钟 | 13 | 20分钟 |
| 6 | 4分钟 | 14 | 30分钟 |
| 7 | 5分钟 | 15 | 1小时 |
| 8 | 6分钟 | 16 | 2小时 |
如果消息重试16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,
将会在接下来的4 小时46 分钟之内进行16 次重试,超过这个时间范围消息将不再重试投递。
注意:一条消息无论重试多少次,这些重试消息的Message ID 不会改变。
# 重试配置
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
◼️ 返回RECONSUME_LATER (推荐)

◼️ 返回Null

◼️ 抛出异常

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回CONSUME_SUCCESS,此后这条消息将不会再重试。
# 自定义消息最大重试次数
消息队列RocketMQ 允许Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
◼️最大重试次数小于等于16 次,则重试时间间隔同上表描述。
◼️最大重试次数大于16 次,超过16 次的重试时间间隔均为每次2 小时。

注意:
1、消息最大重试次数的设置对相同Group ID 下的所有Consumer 实例有效。
2、如果只对相同Group ID 下两个Consumer 实例中的其中一个设置了MaxReconsumeTimes,那么该配置对两个Consumer
实例均生效。
3、配置采用覆盖的方式生效,即最后启动的Consumer 实例会覆盖之前的启动实例的配置。
# [3.3] 死信队列
当一条消息初次消费失败,消息队列RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,
此时,消息队列RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),
存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
# 死信特性
死信消息具有以下特性:
🔹 不会再被消费者正常消费。
🔹 有效期与正常消息相同,均为3 天,3 天后会被自动删除。因此,请在死信消息产生后的3 天内及时处理。
死信队列具有以下特性:
🔸 不会再被消费者正常消费。
🔸 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
🔸 如果一个Group ID 未产生死信消息,消息队列RocketMQ 不会为其创建相应的死信队列。
🔸 一个死信队列包含了对应Group ID 产生的所有死信消息,不论该消息属于哪个Topic。
# 查看死信消息
在控制台查询出现死信队列的主题信息

1、在消息界面根据主题查询死信消息
2、选择重新发送消息
3、一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,
可以在消息队列RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
# [4] 负载均衡
# [4.1] Producer 负载均衡
Producer 端,每个实例在发消息的时候,默认会轮询所有的message queue 发送,以达到让消息平均落在不同的queue 上。
而由于queue 可以散落在不同的broker,所以消息就发送到不同的broker 下,如下图:

发布方会把第一条消息发送至Queue 0,然后第二条消息发送至Queue 1,以此类推。
# [4.2] Consumer 负载均衡
# 集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic 的Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,
在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue 的数量和实例的数量平均分配queue
给每个实例。
默认的分配算法是AllocateMessageQueueAveragely
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue
的形式,
如下图:

需要注意的是,集群模式下,queue 都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue 的消息,由于拉取哪些消息是consumer
主动控制的,
那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue 只分给一个consumer 实例,一个consumer
实例可以允许同时分到不同的queue。
通过增加consumer 实例去分摊queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,
这时候原来分配到的queue 将分配到其他实例上继续消费。
但是如果consumer 实例的数量比message queue 的总数量还多的话,多出来的consumer 实例将无法分到queue,也就无法消费到消息,
也就无法起到分摊负载的作用了。所以需要控制让queue 的总数量大于等于consumer 的数量。
# 广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer 分配queue 的时候,所有consumer 都分到所有的queue。