RocketMQ - 什么是RocketMQ
RocketMQ是一款高性能的分布式消息中间件,默认由阿里巴巴开源并由其捐赠给Apache项目。它具有高吞吐量、低延迟的特点,并支持多种功能如顺序收发、分布式事务一致性等。 RocketMQ广泛应用于电商、金融等领域以解决流量激增等问题,并通过异步解耦提升系统性能。
安装RocketMQ(单机版)需要以下步骤:下载JDK→编译安装包→启动NameServer服务并分配内存→配置broker服务器参数→设置停止命令及broker.conf文件参数。
使用Spring Cloud Alibaba集成RocketMQ时需添加相关依赖项并配置application.properties文件中的端口和绑定信息;发送消息时需定义消息源类并在主程序中调用output().send()方法;消费消息时需定义接收器类并在主程序中使用@StreamListener注解监听消息输入。
实际项目中可自定义接口实现自定义的消息通道名称及绑定配置以适应不同场景的需求。
该分布式消息中间件具备低延迟(low latency)、高可靠性(high availability)、弹性扩展(scalability)及易用性(ease of use)特征,并已由阿里巴巴开源并作为Apache顶级项目投入运营。其特点包括高吞吐量(high throughput)、低延迟性能(low latency performance)以及处理海量消息的能力(handling massive message volumes)。该系统支持顺序消息处理(order-based messaging)、事务支持(transactional support)以及定时任务配置(timed tasks),同时具备消息重试与跟踪功能(message retry and tracking functionality)。特别适合电商和金融等多个领域的场景应用。
RocketMQ的应用场景
RocketMQ的应用场景如下:
- 流量高峰管理:诸如秒杀、抢红包等大型活动会引发系统流量的短期激增,在未采取相应的保护措施的情况下可能导致系统超负荷运行直至崩溃;此外,在限制机制过于严格的情况下也可能导致大量请求失败从而影响用户体验;RocketMQ提供了一套有效的流量高峰管理解决方案来应对这些问题。
- 异步通信与解耦机制:作为淘宝、天猫等平台的核心系统之一,交易系统的每一笔订单数据生成都会触发多个下游业务系统的关注包括物流、购物车状态、积分计算以及流计算分析等;鉴于系统的庞大复杂性 RocketMQ通过实现异步通信与应用解耦确保了主站业务的连续性不受影响。
- 消息顺序保证:无论是证券交易中的时间优先原则还是订单创建支付退款流程以及航班旅客登机消息处理等都需要遵循先进先入(FIFO)的原则;RocketMQ通过提供严格的消息顺序保证技术实现了这一核心需求。
- 分布式事务一致性管理:在交易系统及红包发放等场景中数据的一致性至关重要;引入RocketMQ的支持后可以通过分布式事务实现对业务系统的解耦同时又能保证最终数据的一致性水平。
- 实时数据分析能力提升:传统数据分析方法主要依赖于批量计算模型难以实现实时性需求;结合RocketMQ与流式计算引擎的优势能够方便地对业务数据进行实时分析以支持决策者快速响应市场变化。
- 分布式缓存同步机制:在电商促销活动中商品价格实时更新对页面性能有着重要影响;当商品数量巨大且并发访问 intense时集中式缓存因带宽限制可能导致访问延迟;而通过构建分布式缓存系统能够实时通知商品数据的变化从而提升用户体验。
安装RocketMQ(单机版)
- 安装JDK
- 从官网下载编译好的安装包

- 展开文件夹
- 配置并运行
namesrv服务。 - 配置并运行
Nameserver服务。 - 查看相关日志记录
tail -f ~/logs/rocketmqlogs/namesrv.log

- 配置消息服务器Broker运行时,请指定NameServer服务器的IP地址和端口号。系统默认加载配置文件位于$HOME/.config/rocketchat/broker.conf。
通过命令tail -f ~/logs/rocketmqlogs/broker.log 查看最新日志信息。
若尝试使用tail命令查看broker.log时发现文件不存在,则可执行ls /nohup*.out查找备份日志。
在查看相关日志记录时发现错误提示信息:系统内存不足。

内存不足问题主要是由于 bin 目录中运行 nameserv 和 broker 所需的 runbroker.sh 和 runserver.sh 脚本默认分配的内存过大。 RocketMQ 对内存消耗较高 ,因此默认配置时占用较多资源。 这种配置方式可能导致启动过程中出现资源不足的问题。 在类似环境中(如虚拟机部署 CentOS 服务器) ,由于系统总内存通常有限 ,建议适当降低初始配置值。
解决办法
修改runbroker.sh和runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512g"
Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时
间变慢。
Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,
就会抛出OutOfMemory异常。
xmn 年轻代的heap大小,一般设置为Xmx的3、4分之一
sh mqshutdown broker
请执行以下操作
sh mqshutdown broker
请执行以下操作
sh mqshutdown broker
sh mqshutdown namesrv
在默认设置下,当启动broker服务时系统会自动加载conf/broker.conf文件。该配置文件主要包含一些基础设置参数。其中namesrvAddr变量标识的是本地主机上的nameserver地址。
Broker集群名称 //Cluster名称。当集群中的服务器数量较多时,在其基础上分割成多个子集群,并为每个子集群支持不同的业务场景(例如云原生应用或大数据处理)。
brokerName // 当采用主从模式时, master和slave需具有相同的名称以标识其角色关系
brokerId=0 // 在主从模式中, 一个master broker可管理多个slave, 其中master的id设为0
brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE;此同步指示 slave 和 master 消息仅在完成传递后才会被更新
该信息将被发送给客户端
设置 autoCreateTopicEnable 为 true;当 topic 不存在时会自动生成新主题
RocketMQ如何发送消息
Spring Cloud Alibaba已整合RocketMQ技术,并支持通过Spring Cloud Stream实现对RocketMQ的消息发送与接收功能。
- 在pom.xml中引入jar包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- 配置application.properties
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
- 使用Binder发送消息
@SpringBootApplication
@EnableBinding({Source.class})
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
@RestController
public class SendController {
@Autowired
private Source source;
@GetMapping("/send")
public String send(String msg) {
MessageBuilder builder = MessageBuilder.withPayload(msg);
Message message = builder.build();
source.output().send(message);
return "Hello RocketMQ, Send " + msg;
}
}
使用@EnableBinding({Source.class})可以在配置文件中实现对名为output的消息通道进行绑定操作。其中指定消息通道的名称为output。通过发送RESTful HTTP请求至http://localhost:8080/send?msg=test的方式,将消息传输至RocketMQ服务。
在实际项目中具备多个发送消息通道,在每个通道上能够进行定制化设置以赋予其独特的标识码;借鉴Source类创建相应的接口并完成必要的配置参数设定;只需调整通道名称及相关配置就可完成整个操作流程的初始化设置工作。
public interface OrderSource {
String OUTPUT = "orderSourcec";
@Output(OrderSource.OUTPUT)
MessageChannel output();
}
@SpringBootApplication
@EnableBinding({Source.class, OrderSource.class})
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.orderOutput.destination=TopicOrder
spring.cloud.stream.rocketmq.bindings.orderOutput.producer.group=order-group
RocketMQ如何消费消息
- 引入相关依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
- 配置application.properties
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
- 定义消息监听
@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
@EnableBinding({Sink.class})表明了配置文件中名为input的消息通道被绑定。
在Sink类中定义的消息通道的名称被指定为input。
@StreamListener用于定义一个消息监听器来接收RocketMQ中的消息。
实际项目中可能存在多个接收消息的通信渠道,在设计阶段可以选择性地为每个渠道分配独特的标识符。依据 Sink 类型设计相应的接口功能后,在系统运行过程中只需修改对应通信渠道的名字及其相关的配置参数即可实现预期效果。
public interface InputChannel {
String USER_INPUT = "userInput";
String ORDER_INPUT = "orderInput";
@Input(InputChannel.USER_INPUT)
SubscribableChannel userInput();
@Input(InputChannel.ORDER_INPUT)
SubscribableChannel orderInput();
}
@EnableBinding({Sink.class, InputChannel.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
@StreamListener(InputChannel.ORDER_INPUT)
public void receiveOrderInput(String msg) {
System.out.println(" receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.input.producer.group=demo-group
spring.cloud.stream.bindings.orderInput.destination=TopicOrder
spring.cloud.stream.rocketmq.bindings.orderInput.producer.group=order-group
