RocketMQ 基础篇(下)
内容概述
# RocketMQ 基础篇(下)
- RocketMQ 基础篇(下)
# [1] RocketMQ 的使用
# [1.1] 基本流程
- 导入MQ 客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
2
3
4
5
- 消息发送者步骤
- 创建消息生产者producer,并指定生产者组名
- 指定Nameserver 地址
- 启动producer
- 创建消息对象,指定Topic、Tag 和消息体
- 发送消息
- 关闭生产者producer
- 消息消费者步骤
- 创建消费者Consumer,指定消费者组名
- 指定Nameserver 地址
- 订阅主题Topic 和Tag
- 设置回调函数,处理消息
- 启动消费者consumer
# [2] 玩转各种消息
# [2.1] 普通消息
# 消息发送
# 发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
代码演示:

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。



Message ID
消息的全局唯一标识(内部机制的ID 生成是使用机器IP 和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),
由消息队列MQ系统自动生成,唯一标识某条消息。
SendStatus
发送的标识。成功,失败等。
Queue
相当于是Topic 的分区;用于并行发送和接收消息。
# 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker 的响应。
代码演示:

消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。


# 单向发送
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
代码演示:

单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
此方式发送消息的过程耗时非常短,一般在微秒级别。

# 消息发送的权衡

# 消息消费
# 集群消费
消费者的一种消费模式。一个Consumer Group 中的各个Consumer 实例分摊去消费消息,即一条消息只会投递到一个Consumer
Group 下面的一个实例。
实际上,每个Consumer 是平均分摊Message Queue 的去做拉取消费。例如某个Topic 有3 条Q,
其中一个Consumer Group 有3 个实例(可能是3个进程,或者3 台机器),那么每个实例只消费其中的1 条Q。
而由Producer 发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q 上,可以认为Q
上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。
代码演示:


# 广播消费
消费者的一种消费模式。消息将对一个Consumer Group 下的各个Consumer 实例都投递一遍。即即使这些Consumer
属于同一个Consumer Group,
消息也会被Consumer Group 中的每个Consumer 都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic 下面的每个Message Queue
去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
代码演示:


# 消息消费时的权衡
集群模式:适用场景&注意事项
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:适用场景&注意事项
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以消息队列RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
# [2.2] 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin 轮询方式把消息发送到不同的queue(分区队列)
;而消费消息的时候从多个queue 上拉取消息,
这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue 中,消费的时候只从这个queue 上依次拉取,
则就保证了顺序。当发送和消费参与的queue 只有一个,则是全局有序;如果多个queue 参与,则为分区有序,即相对每个queue,消息都是有序的。
全局顺序消息

部分顺序消息

# 顺序消息生产
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。



# 顺序消息消费
消费时,同一个OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。


# [2.3] 消息发送时的重要方法/属性
# 属性
org.apache.rocketmq.example.details. ProducerDetails 类中:

- producerGroup:生产者所属组
- defaultTopicQueueNums:默认主题在每一个Broker 队列数量
- sendMsgTimeout:发送消息默认超时时间,默认3s
- compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
- retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3 次
- retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
- retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker 时,是否不等待存储结果就返回,默认为false
- maxMessageSize:允许发送的最大消息长度,默认为4M
# 方法
org.apache.rocketmq.example.details. ProducerDetails 类中:
// 启动
void start() throws MQClientException;
// 关闭
void shutdown();
// 查找该主题下所有消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
2
3
4
5
6
7
8


# 单向发送

// 发送单向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
// 选择指定队列单向发送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;
2
3
4
5
6
7
# 同步发送

// 同步发送消息
SendResult send(final Message msg) throws MQClientException,
RemotingException, MQBrokerException,InterruptedException;
// 同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
// 选择指定队列同步发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
2
3
4
5
6
7
8
9
10
11
# 异步发送

// 异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
// 异步超时发送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
// 选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
2
3
4
5
6
7
8
9
10
11
# [2.4] 消息消费时的重要方法/属性
org.apache.rocketmq.example.details. ComuserDetails 类中
# 属性

// 消费者组
private String consumerGroup;
// 消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
// 指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// ConsumeFromTimestamp 模式下只会在订阅组(消费者群组)第一次启动的时候,
// 过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了新消息。
// 下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和ConsumeFromLastOffset类似。
// 消费者最小线程数量
private int consumeThreadMin = 20;
// 消费者最大线程数量
private int consumeThreadMax = 20;
// 推模式下任务间隔时间
private long pullInterval = 0;
// 推模式下任务拉取的条数,默认32 条
private int pullBatchSize = 32;
// 消息重试次数,-1 代表16 次
private int maxReconsumeTimes = -1;
// 消息消费超时时间
private long consumeTimeout = 15;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 方法
// 订阅消息,并指定队列选择器
void subscribe(final String topic, final MessageSelector selector);
//取消消息订阅
void unsubscribe(final String topic);
// 获取消费者对主题分配了那些消息队列
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic);
2
3
4
5
6
7
8
// 注册并发事件监听器
void registerMessageListener(final MessageListenerConcurrently messageListener);
2

// 注册顺序消息事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener);
2

# 消费确认(ACK)
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,
RocketMQ 才会认为这批消息(默认是1 条)是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。
如果业务的回调没有处理好而抛出异常,会认为是消费失败ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。
为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回Broker(topic 不是原topic 而是这个消费组的RETRY
topic),
在延迟的某个时间点(默认是10 秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16
次),
就会投递到DLQ 死信队列。应用可以监控死信队列来做人工干预。
另外如果使用顺序消费的回调MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,
所以没有RECONSUME_LATER 的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费
# [2.5] 延时消息
# 概念介绍
延时消息: Producer 将消息发送到消息队列RocketMQ 服务端,但并不期望这条消息立马投递, 而是延迟一定时间后才投递到Consumer 进行消费,该消息即延时消息。
# 适用场景
消息生产和消费有时间窗口要求: 比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30 分钟以后投递给消费者, 消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
# 使用方式
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在Broker 层面,必须要做消息排序,如果再涉及到持久化,
那么消息排序要不可避免的产生巨大性能开销。(阿里云RocketMQ 提供了任意时刻的定时消息功能,
Apache 的RocketMQ 并没有,阿里并没有开源)
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息是根据延迟队列的level 来的,延迟队列默认是:
msg.setDelayTimeLevel(3)代表延迟10 秒
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
源码中: org/apache/rocketmq/store/config/MessageStoreConfig.java

这18 个等级(秒(s)、分(m)、小时(h)),level 为1,表示延迟1 秒后消费,level 为5 表示延迟1 分钟后消费,
level 为18 表示延迟2 个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level 即可。消费消息跟普通的消费消息一致。
# 代码演示
org.apache.rocketmq.example. scheduled 包中
# 生产者

# 消费者

# [2.6] 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
# 代码演示
org.apache.rocketmq.example. batch 包中
# 生产者

# 消费者

# 批量切分
如果消息的总长度可能大于4MB 时,这时候最好把消息进行分割
# 代码演示
我们需要发送10 万元素的数组,这个量很大,怎么快速发送完。同时每一次批量发送的消息大小不能超过4M。

# [2.7] 过滤消息
org.apache.rocketmq.example. filter 包中
# Tag 过滤
在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。
消费者将接收包含TAGA 或TAGB 或TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,
可以使用SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。
# Sql 过滤
# SQL 基本语法
RocketMQ 定义了一些基本语法来支持这个特性,你也可以很容易地扩展它。 只有使用push 模式的消费者才能用使用SQL92 标准的sql 语句,常用的语句如下:
- 数值比较:比如:>,>=,<,<=,BETWEEN,=;
- 字符比较:比如:=,<>,IN;
- IS NULL 或者IS NOT NULL;
- 逻辑符号:AND,OR,NOT;
- 常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或FALSE
# 消息生产者(加入消息属性)
发送消息时,你能通过putUserProperty 来设置消息的属性。

# 消息消费者(使用SQL 筛选)
用MessageSelector.bySql 来使用sql 筛选消息。

需要注意的是:如果这个地方抛出错误:说明Sql92 功能没有开启 
需要修改Broker.conf 配置文件, 加入enablePropertyFilter=true 配置,然后重启Broker 服务。
# [2.8] 事务消息

其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
# 正常事务流程
(1) 发送消息(half 消息):图中步骤1。
(2) 服务端响应消息写入结果:图中步骤2。
(3) 根据发送结果执行本地事务(如果写入失败,此时half 消息对业务不可见,本地逻辑不执行):图中步骤3。
(4) 根据本地事务状态执行Commit 或者Rollback(Commit 操作生成消息索引,消息对消费者可见):图中步骤4。
# 事务补偿流程
(1) 对没有Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图中步骤5。
(2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图中步骤6。
(3) 根据本地事务状态,重新Commit 或者Rollback::图中步骤6。
其中,补偿阶段用于解决消息Commit 或者Rollback 发生超时或者失败的情况。
# 事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成图中了1,2,3,4 步,第4 步是Commit)。
- TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成图中了1,2,3,4 步, 第4 步是Rollback)。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成图中了1,2,3 步, 但是没有4 或者没有7,无法Commit 或Rollback)。
# 代码演示
org.apache.rocketmq.example. transaction 包中
# 创建事务性生产者
使用TransactionMQProducer 类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。
执行本地事务后、需要根据执行结果对消息队列进行回复。

# 实现事务的监听接口
当发送半消息成功时,我们使用executeLocalTransaction 方法来执行本地事务(步骤3)。它返回前一节中提到的三个事务状态之一。
checkLocalTranscation
方法用于检查本地事务状态(步骤5),并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。


# 使用场景
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。
如何保证数据的完整性?
可以使用RocketMQ 的分布式事务保证在下单失败后系统数据的完整性。
# 使用限制
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过Broker 的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是用户可以通过Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过N 次的话( N = transactionCheckMax ) 则Broker 将丢弃此消息, 并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在Broker 配置文件中的参数transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时, 用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证, 如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者ID 不能与其他类型消息的生产者ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的 生产者ID 查询到消费者。
# [3] 分布式事务

业务场景:用户A 转账100 元给用户B,这个业务比较简单,具体的步骤:
1、用户A 的账户先扣除100 元
2、再把用户B 的账户加100 元
如果在同一个数据库中进行,事务可以保证这两步操作,要么同时成功,要么同时不成功。这样就保证了转账的数据一致性。
但是在微服务架构中,因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。
因为各个服务都是独立的模块,都是远程调用,都没法在同一个事务中,都会遇到事务问题。

消息中间件的方式,把扣款业务和加钱业务异步化,扣款成功后,发送“扣款成功消息”到消息中间件;加钱业务订阅“扣款成功消息”,
再对用户B 加钱(系统怎么知道给用户B 加钱呢?是消息体里面包含了源账户和目标账户信息,以及钱金额等信息)
场景一:先扣款后向MQ 发消息
先扣款再发送消息,万一发送消息失败了,那用户B 就没法加钱
场景二:先向MQ 发像消息,后扣款
扣款成功消息发送成功,但用户A 扣款失败,可加钱业务订阅到了消息,用户B 加了钱
问题所在,也就是没法保证扣款和发送消息,同时成功,或同时失败;导致数据不一致。
RocketMq 消息中间件把消息分为两个阶段:半事务阶段和确认事务阶段
- 半事务阶段: 该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog 中,但consumeQueue 中不可见,也就是消费端(订阅端)无法看到此消息
- commit/rollback 阶段(确认事务阶段): 该阶段主要是把prepared 消息保存到consumeQueue 中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback 就不保存。

整个流程:
1、A 在扣款之前,先发送半事务消息
2、发送预备消息成功后,执行本地扣款事务
3、扣款成功后,再发送确认消息
4、B 消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意: 上面的确认消息可以为commit 消息,可以被订阅者消费;也可以是Rollback 消息,即执行本地扣款事务失败后,提交rollback
消息,
即删除那个预备消息,订阅者无法消费。
异常1:如果发送半事务消息失败,下面的流程不会走下去;这个是正常的。
异常2:如果发送半事务消息成功,但执行本地事务失败;这个也没有问题,因为此预备消息不会被消费端订阅到,消费端不会执行业务。
异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A 扣款成功了,
但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。

RocketMq 如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq 会定时遍历commitlog 中的半事务消息。
异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq 会进行回查半事务消息,在回查后发现业务已经扣款成功了,
就补发“发送commit 确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常2 也解决了,如果本地事务没有执行成功,RocketMQ 回查业务,发现没有执行成功,就会发送RollBack
确认消息,
把消息进行删除。
同时还要注意的点是,RocketMQ 不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。
(一般就是把A 已经处理完的业务进行回退)

另外,如果本地事务执行了很多张表,那是不是我们要把那些表都要进行判断是否执行成功呢?这样是不是太麻烦了,而且和业务很耦合。
好的方案是设计一张Transaction 表,将业务表和Transaction 绑定在同一个本地事务中,如果扣款本地事务成功时,
Transaction 中应当已经记录该TransactionId 的状态为「已完成」。当RocketMq 事务回查时,只需要检查对应的TransactionId
的状态是否是「已完成」就好,
而不用关心具体的业务数据。
如果是银行业务,对数据要求性极高,一般A 与B 需要进行手动对账,手动补偿。