RabbitMQ 基本api使用 以及和spring、springboot整合
文章目录
-
-
-
1 RabbitMQ介绍
-
1.1 RabbitMQ简介及使用场景
-
- 1.2 其它相关知识
-
2 快速入门
-
- 2.1 RabbitMQ 的工作原理
-
-
2.2 下载安装
-
- 2.2.1 下载安装
- 2.2.2 启动
- 2.2.3 注意事项
-
2.3 快速开始体验
-
-
2.3.1 源码
-
2.3.2 流程总结
-
3、工作模式
-
- 3.1 Work queues
-
-
3.2 Publish/subscribe
-
- 3.2.1 模式简介
- 3.2.2 源码
- 3.2.3 问题分析
-
3.3 Routing
-
- 3.3.1 模式简介
- 3.3.2 源码
- 3.3.3 问题分析
-
3.4 Topics
-
- 3.4.1 模式简介
- 3.4.2 源码
- 4.3.3 问题分析
-
3.5 Header 模式
-
- 3.5.1 模式简介
- 3.5.2 源码
-
3.6 RPC
- 4 Springboot整合RabbitMQ
-
- 4.1 引入依赖
-
4.2 配置
-
4.3 生成者
-
4.4 消费者
- 附
-
1 RabbitMQ介绍
1.1 RabbitMQ简介及使用场景
MQ全称为Message Queue(消息队列),即Message Queue(消息队列),RabbitMQ是由Erlang语言开发的一种基于Advanced MessageQueue(高级消息队列)协议实现的消息队列应用。该技术实现了应用程序间的通信方法,并且由于其高效的性能特点,在分布式系统开发中得到了广泛应用。RabbitMQ官网
消息队列得到广泛应用的领域包括:企业级应用系统设计、高可用性服务构建以及异步计算任务处理等领域,并且支持包括Java、.NET、Python在内的多种编程语言的应用开发工作。支持基于异步模式的任务执行机制以及分布式架构特点使得其成为现代企业级应用系统的重要组成部分之一
*任务通过异步机制执行。由消息队列通知消息接收方执行的任务无需同步且耗时长。提升了系统整体响应效率。
-
应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ? -
使得简单, 功能强大.
- 基于AMQP协议.
- 社区活跃, 文档完善.
- 高并发性能好, 这主要得益于Erlang语言.
- Spring Boot默认已集成RabbitMQ
1.2 其它相关知识
AMQP是什么 ?
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式,为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。AMQP官网
JMS是什么 ?
JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。jms是java语言专属的消息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。
2 快速入门
2.1 RabbitMQ 的工作原理
下图是RabbitMQ的基本结构:

说明:
- Broker :消息队列服务进程,此进程包括两个部分:Exchange和Queue。
- Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
- Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
发送消息流程
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
接收消息流程
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
2.2 下载安装
2.2.1 下载安装
RabbitMQ是由Erlang语言构建的开源消息队列系统,在并发处理和分布式计算方面具有显著优势,在电信行业有着广泛的应用。作为Erlang语言的一个重要组成部分,OTP(OpenTelecom Platform)整合了大量基于Erlang开发的应用组件与工具包。为了顺利部署RabbitMQ服务系统,在安装过程中必须确保遵循严格的版本兼容性原则,并参考附图所示的系统架构图。

RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
1)首先下载erlang Otp_win64_20.3.exe并以管理员权限运行该文件完成安装操作。
完成安装后需配置erlang环境变量:将ERLANG_HOME设置为D:\software\erl9.3,并将其附加到系统路径变量中;
2)其次获取RabbitMQ rabbitmq-server-3.7.3.exe同样以管理员权限运行该文件完成安装操作。
2.2.2 启动
安装成功后会自动生成并运行RabbitMQ服务。
1)启动RabbitMQ时,在开始菜单中定位到其位置;
完成于在开始菜单中找到该应用程序的位置。

说明:
- RabbitMQ Service-deployment: 部署服务
- RabbitMQ Service-removal: 移除服务
- RabbitMQ Service-booting: 启动
- RabbitMQ Service-shutdown: 关闭
2)如果没有开始菜单则进入安装目录下sbin目录手动启动:
a.安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
b.安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
3)启动成功 登录RabbitMQ
进入浏览器,输入:http://127.0.0.1:15672,初始行号密码guest/guest


2.2.3 注意事项
需要改写的内容如下
改写内容
2.3 快速开始体验
2.3.1 源码
引入相关依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qqxhb</groupId>
<artifactId>rabbitmq-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.14.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
生产者
package com.qqxhb.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/** * Producing means nothing more than sending. A program that sends messages is a
* producer。
* */
public class Producer {
public static void main(String[] args) throws Exception {
/* * 1、创建一个ConnectionFactory,设置主机及端口
* 2、通过工厂创建连接
* 3、通过连接 创建通道Channel
* 4、通过通道发送消息
* 5、关闭资源
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "Hello world.";
for (int i = 0; i < 3; i++) {
channel.basicPublish("", "quickstart", null, message.getBytes());
}
channel.close();
connection.close();
}
}
消费者
package com.qqxhb.rabbitmq.quickstart;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
package com.qqxhb.rabbitmq.quickstart;
import com.qqxhb.rabbitmq.api.consumer.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Consumer {
public static void main(String[] args) throws Exception {
/* * 1、创建一个ConnectionFactory,设置主机及端口
* 2、通过工厂创建连接
* 3、通过连接 创建通道Channel
* 4、通过通道声明队列
* 5、创建消费者
* 6、给通道设置消费者及队列
* 7、获取消息
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "quickstart";
channel.queueDeclare(queueName, true, false, false, null);
channel.basicConsume(queueName, true,new MyConsumer(channel));
}
}
2.3.2 流程总结
发送端
- 建立一个ConnectionFactory并配置主机IP地址和端口号;
- 利用量词工厂生成连接实例;
- 利用量词建立通道Channel;
- 利用量词的通道传递消息;
- 最后释放所有资源以避免泄漏。
消费端
- 建立一个ConnectionFactory,并指定其主机和端口参数;
- 利用工厂生成相应的连接对象;
- 基于该连接创建对应的Channel;
- 对该通道进行声明以配置队列;
- 建立消费者实例;
- 对指定的通道配置绑定消费者和队列;
- 接收消息并进行响应处理。
3、工作模式
RabbitMQ有6种工作模式:作业队列;发布/订阅;路由;主题;头信息;远程过程调用
3.1 Work queues

compared to the entry program, work queues feature an additional consumer layer, with both layers consuming messages from the same queue. application scenarios: when task loads are heavy or numerous, utilizing work queues can enhance processing efficiency. implementation approach: 1) initiate multiple consumers via the entry program; 2) enqueue multiple messages from producers. conclusions: 1) each message is exclusively handled by a single consumer; 2) rabbit employs a polling mechanism to distribute messages evenly among consumers; 3) once a consumer completes processing a message, it will subsequently receive the next one.
3.2 Publish/subscribe
3.2.1 模式简介

订阅模式的发布流程如下所述:
- 各个消费者都会关注自己的订阅队列。
- 生产者会将消息发送至broker,并且交换机负责将这些消息转发至所有与之绑定的队列。这些队列都能成功接收 incoming messages。
3.2.2 源码
场景:系统通知信息中指出,在用户的账户资金成功充值或完成转账操作后会触发系统通知机制。该机制支持通过短信、邮件等多种通讯方式向用户发送通知信息
在配置环节中,请您按照以下步骤操作:首先,在系统管理界面中找到并点击"服务参数"选项卡;接着,在右侧的设置模块中选择"消息服务"子模块;随后,在该子模块中注册并绑定至该交换机;最后,在绑定配置参数时请确保无需指定routingkey配置参数
public class Producer02_publish {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//创建一个连接
connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/** * 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//声明队列
// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,
Object> arguments)
/** * 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/** * 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
//发送消息
for (int i=0;i<10;i++){
String message = "inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
/** * 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队 列名称表示消 息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
邮件发送消费者
public class Consumer02_subscribe_email {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_FANOUT_INFORM="inform_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
//创建一个连接
Connection connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
Channel channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/** * 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//声明队列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
/** * 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/** * 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
//定义消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息内容
String message = new String(body, "utf‐8");
System.out.println(message);
}
};
/** * 监听队列String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
通过RabbitMQ管理界面设置并运行参数配置后

3.2.3 问题分析
1、publish/subscribe与work queues有什么区别与联系?
区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机 。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实际工作用什么? publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。
3.3 Routing
3.3.1 模式简介

路由模式的机制为:
- 消费者端设备会自动生成一个独立的队列,并设置一个唯一的routingkey用于标识该队列。
- 消息发送到交换机后,交换机会根据接收到的消息中的routingkey字段自动确定目标队列,并将消息转发至相应的通道。
3.3.2 源码
消费者 配置交换机exchange-routing_inform,并注册并绑定两个队列至该交换机;在绑定过程中,请确保指定routing key作为键值。
public class Producer03_routing {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
器
//创建一个连接
connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/** * 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
/** * 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/** * 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
//发送邮件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
/** * 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
邮件通知消费者
public class Consumer03_routing_email {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_ROUTING_INFORM="inform_exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
//创建一个连接
Connection connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
Channel channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/** * 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean
autoDelete, Map<String, Object> arguments)
/** * 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/** * 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
//定义消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息内容
String message = new String(body, "utf‐8");
System.out.println(message);
}
};
/** * 监听队列String queue, boolean autoAck,Consumer callback
* 参数明细
* 1、队列名称
* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置
为false则需要手动回复
* 3、消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
访问RabbitMQ管理界面并监控交换机的绑定状态:通过生产者向系统发送一批消息(...),消息将被路由键引导至预先指定的队列。

3.3.3 问题分析
Routing模式与Publish/subscribe有什么不同之处? Routing模式在绑定交换机时必须指定routingkey,并且只有当消息到达该routingkey时才会被转发至相应的队列。
3.4 Topics
3.4.1 模式简介

路由模式:
- 每个消费者关注自己的队列,并配置带有统一前缀的routingkey。
- 生产者将消息发送至broker,则交换机依据.routingkey将该消息路由至指定的队列上。
3.4.2 源码
场景分析:按照用户的指定配置进行通知设置,在此过程中可分别针对不同类型的终端设备选择其对应的通讯方式。具体而言,在配置时可以选择以下几种方式:将仅接受Email的通知类型设定为只会发送邮件;将仅接受短信的通知类型设定为只会发送短信;而对于同时配置两种不同的通知类型,则系统会同时发送这两种类型的提醒信息。
public class Producer04_topics {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//创建一个与MQ的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
器
//创建一个连接
connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
channel = connection.createChannel();
//声明交换机 String exchange, BuiltinExchangeType type
/** * 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//声明队列
/** * 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//发送邮件消息
for (int i=0;i<10;i++){
String message = "email inform to user"+i;
//向交换机发送消息 String exchange, String routingKey, BasicProperties props,
byte[] body
/** * 参数明细
* 1、交换机名称,不指令使用默认交换机名称 Default Exchange
* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
* 3、消息属性
* 4、消息内容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信消息
for (int i=0;i<10;i++){
String message = "sms inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信和邮件消息
for (int i=0;i<10;i++){
String message = "sms and email inform to user"+i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
if(channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费端
队列绑定交换机指定通配符:
中间以.分隔开,在这里符号#被用来匹配多个词汇项,并且符号*则被用来表示单个词汇项的位置。
/声明队列
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//绑定email通知队列
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
//绑定sms通知队列
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");
调用RabbitMQ管理控制台并查看交换机的绑定配置:通过生产者向系统发送多条测试消息,并观察交换机依据路由键统一配对机制将消息路由至目标队列的过程

4.3.3 问题分析
本案例的需求是否可以通过 Routing 工作模式来实现?
通过 Routing 模式同样可以实现本案例功能。具体来说,需要配置三个 routingkeys 分别是 email、sms 和 all。将 email 队列与 email 和 all 配置关联起来;将 sms 队列与 sms 和 all 配置关联起来;这样就能达到上文所述的效果;而该方案的操作步骤比 topics 方案更为复杂一些。
Topic 机制则更为强大一些,在其框架下不仅可以实现实例中的 Routing 功能还可以整合 publish/subscribe 功能。
3.5 Header 模式
3.5.1 模式简介
在 header 模式与 routing 方面的主要区别在于, header 模式不再使用 routing key, 而是采用 header 中的 key/value(键值对)来进行队列匹配。
3.5.2 源码
在用户的通知系统中进行配置,在配置中可以分别对接收Email和sms的用户进行独立的设置:将接收Email的类型仅配置为Email;将接收sms的类型仅配置为短信;当选择同时接受两种类型的通知时,则表示两种通知都会被有效接收。
生产者部分代码
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
邮件消费者部分代码
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
3.6 RPC

RPC即为客户端通过远程调用服务端执行的方法。通过MQ可以实现RPC的非阻塞调用,基于Direct交换机完成,具体流程如下:
1、客户端既作为生产者也作为消费者,将RPC请求消息发送至请求队列,同时持续监控响应队列的状态;
2、服务端持续接收并处理来自请求队列的消息,一旦接收到有效任务后便启动相关操作;
3、服务端在完成任务后,将计算所得的结果反馈至响应队列;
4、客户端(调用方)则持续接收至响应队列中的 RPC 结果信息。
其他API操作请查看文末源码
4 Springboot整合RabbitMQ
4.1 引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qqxhb</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.2 配置
application.properties
//共用配置
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
//生产者配置
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
//消费者配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
server.servlet.context-path=/
server.port=8001
4.3 生成者
package com.qqxhb.springboot.producer;
import java.util.Map;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
}
4.4 消费者
package com.qqxhb.springboot.consumer;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message<Object> message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
/** * * @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.qqxhb.springboot.entity.Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
以上仅展示了部分代码片段。整体架构旨在确保系统的可靠性和可传递性,请参阅文末附录中的完整源码。
附
源码地址:https://github.com/qqxhb/rabbitmq
源码介绍:
- rabbitmq-api的核心功能包括实现消息队列的交换机功能以及acks机制的应用。
- rabbitmq-spring实现了内置Spring框架的功能。
- springboot提供了完整的Spring Boot组件集合。
- 通过Spring Cloud Stream实现了流数据处理能力的集成。
