RocketMQ实践(5.x)-顺序消费使用详解
RocketMQ实践(5.x)-顺序消费使用详解
RocketMQ顺序消费
在公司项目中存在一个典型应用场景:用于处理实时预警任务。在这一场景下,系统会同时处理某个设备处于正常状态或异常状态的事件。这些事件具有明确的时间序列性,在发布与消费流程上应遵循先进先出(FIFO)原则以确保正确顺序。 RocketMQ 的顺序消费模式脱颖而出,在提升吞吐量的同时也采用多线程的方式进行消息处理,并将其划分为多个独立的消息队列以保证性能需求。然而这种分割可能导致消息消费出现无序现象。为此 RocketMQ 5.x 在后续版本中对顺序消费机制进行了优化增添了消息组(MessageGroup)等新概念从而使得整体设计更加均衡地兼顾了顺序性和高性能特点

生产顺序性
Apache RocketMQ 5.x 生产顺序性由以下几点保证:
- 同一消息组(MessageGroup)
生产者在发送信息时可对每个信息设定所属群组。
仅在同属一个群组的信息才可确保传递的先后顺序。
不同群组的信息或未指定任何群组的信息无法确保传递的先后顺序。
- 单一生产者
消息产生的时间序列仅允许一个生产者参与;分布在各自独立的系统中;即便设置相同的消息组,在线服务依然无法确定各个生产者之间所生成的消息之间的先后顺序。
- 串行发送
云消息队列 RocketMQ 版生产者客户端具备多线程下的安全消息传输能力,在实际应用中需要注意的是:当生产者采用多线程并行发送消息时,在不同发信线程之间将无法明确判定其消息发送的时间先后顺序
符合上述条件的生产者,在云消息队列 RocketMQ 版本之后会发送到该版本中,并且确保了同一消息组的消息按照发送顺序存储在同一队列中。服务端的具体存储逻辑如下:

如上所述,在参考图中可以看到
消费顺序性
Apache RocketMQ 确保消息消费者能够按照消息存储的顺序正确处理。
- 消费的有序性
在确保生产顺序性这一前提下
消息在使用过程中出现故障或达到超时状态时,则会被服务端触发重试机制;每次重试都会发送新的请求指令;原有请求对应的新的请求将由服务端单独处理,并不再具有执行价值
在顺序消息消费出现故障后启动重试机制时为了确保消息的整体有序性 后续的信息无法立即被处理 只有等到前面所有信息已经被成功处理完毕之后 才能继续执行下一步操作
- 有限重试
在投递消费者服务过程中出现问题时会导致订单处理的连续性和效率受到影响。
当发生故障时会导致订单处理的连续性和效率受到影响。
对于采用Apache RocketMQ协议的消息系统而言
当发生故障时会导致订单处理的连续性和效率受到影响。
对于采用Apache RocketMQ协议的消息系统而言
当发生故障时会导致订单处理的连续性和效率受到影响。
对于采用Apache RocketMQ协议的消息系统而言
当发生故障时会导致订单处理的连续性和效率受到影响。
对于需要严格保证消费顺序的场景,请务设置合理的重试次数。
使用示例
环境准备
以windows docker环境为背景,搭建一个5.x版本。
- 准备docker compose脚本
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.3.0
container_name: rmqnamesrv
ports:
- 10908:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- C:\Users\wayne\docker\RocketMQ\data:/home/rocketmq/store
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.3.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 10907:8080
restart: unless-stopped
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
networks:
rocketmq:
driver: bridge
yaml

- 启动项目
docker-compose -f XXXXX\XXXXX\rocketmq-5.3.0-docker-compose.yml -p rockermq_project up -d
text-plain
- 进入broker容器,执行一些初始化操作
$ docker exec -it rmqbroker bash
text-plain
- 创建FIFO主题
顺序消息仅能使用MessageType=FIFO的主题(即顺序消息只能发送至类型与自身一致的主题中),其传递的消息类型应与主题保持一致。
// -c 集群名称
// -t Topic名称
// -n Nameserver地址
// -o 创建顺序消息主题
sh mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -a +message.type=FIFO
text-plain
- 创建两个订阅消费组
// -c 集群名称
// -g ConsumerGroup名称
// -n Nameserver地址
// -o 创建顺序订阅消费组
sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup -o true
sh mqadmin updateSubGroup -c DefaultCluster -g FIFOGroup01 -o true
text-plain
发送消息
相较于普通消息发送方式而言,在进行顺序消息发送时需要将应用逻辑配置为MessageGroup(而不是消费组)。建议根据具体业务场景进行划分,并尽量采用细粒度设计以实现业务拆分与并发扩展。
@Test
public void sendMessage() throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "FIFOTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//顺序消息发送。
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
text-x-java

接收消息

根据如上图所示,在采用RocketMQ消费者架构的情况下进行的消息处理的主要流程包括:消息获取→进行处理→完成提交
针对这几个阶段,
RocketMQ 支持多种消费者类型:PushConsumer 和 SimpleConsumer。
这些类型的消费者采用各自独特的实现方式与接口来满足不同业务场景下的 consumes 需求。
具体差异如下:
| 对比项 | PushConsumer | SimpleConsumer |
| 接口方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 | 业务方自行实现消息处理,并主动调用接口返回消费结果。 |
| 消费并发度管理 | 由SDK管理消费并发度。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发 | 由业务方消费逻辑自行管理消费线程。消费时可自定义消息的预估处理时长。 |
| 接口灵活度 | 高度封装,不够灵活。 | 原子接口,可灵活自定义。 |
| 适用场景 | 适用于无自定义流程的开发场景。 | 适用于需要高度自定义业务流程的开发场景。 |
| 消息超时时间 | 默认1分钟,不支持修改。 | 超时时间最大可设置12小时;最小设置为10秒。 |
PushConsumer
当消息消费类型配置为PushConsumer时, 云消息队列 RocketMQ 确保消息按照存储顺序依次发送给消费者.
@Test
public void receiveMessage01() throws ClientException, InterruptedException {
//消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "FIFOTopic";
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
String consumerGroup = "FIFOGroup";
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
provider.newPushConsumerBuilder()
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(endpoint).build())
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//设置消费监听器。
.setMessageListener(messageView -> {
System.out.println(messageView);
//消费消息并返回处理结果。
return ConsumeResult.SUCCESS;
}).build();
Thread.sleep(1000000);
}
text-x-java

SimpleConsumer
当消费者类型被配置为SimpleConsumer时,该消费者可能一次性获取多条消息。此时需要确保消息处理的顺序性由业务方来完成。
@Test
public void receiveMessage02() throws ClientException {
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
String topic = "FIFOTopic";
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081
String endpoint = "123.123.8.2:8080";
String consumerGroup = "FIFOGroup01";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 订阅消息的过滤规则,“*”表示订阅多有tag消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
.setConsumerGroup(consumerGroup)// 设置消费者分组
.setClientConfiguration(configuration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置预绑定的订阅关系
.setAwaitDuration(Duration.ofSeconds(30))// 设置从服务端接收消息的最大等待时间
.build();
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。如果处理失败,如果处理失败不需要回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
}
text-x-java

使用建议
- 串行消费,避免批量消费导致乱序
消息消费建议串行处理,避免一次消费多条消息,否则可能出现乱序情况。
例如,在传递流程中采用 1→2→3→4 的方式,在接收过程中实施批量接收。在接收过程中采用以下步骤:首先进行 1→23(批量处理)>;如果出现报错,则进行 > 23(重试处理)>;最后完成 > 4 的操作。此时可能由于信息条目三的报错而导致信息条目二被重复接收,并最终可能导致整个信息序列出现混乱。
- 消息组尽可能打散,避免集中导致热点
RocketMQ通过机制实现相同消息组的消息存储在同一个队列中;当不同业务场景的消息集中在少数或单一消息组时,则这些消息会导致服务端少数或单一队列承受较大的存储压力。这可能导致性能瓶颈问题出现;通常推荐的设计方案是基于订单ID和用户ID作为顺序参考依据。
因该方法基于此提出了一种优化方案
鸣谢
- 阿里云消息队列 RocketMQ 5.x系列——消息消费类型
- 阿里云消息队列 RocketMQ 5.x系列——有序性配置
修订记录
- 2024.10.09 - 针对5.x版本内容重新增补。
