RabbitMQ知识点梳理
1. 什么是MQ
MQ(Message Queue)也被称为消息队列或消息中间件等技术手段,在软件架构设计中占据重要地位。该技术采用典型的生产者-消费者模型进行设计与实现:其中生产者持续向队列中添加消息,消费者持续从队列中获取消息。由于这些操作过程均为异步进行,并且专注于完成特定的任务——即消息发送与接收——因此能够有效隔离业务逻辑的具体实现细节。这种架构设计使得系统间的解耦更为便捷,并且其核心优势在于利用高效可靠的传递机制实现平台间的数据交互;同时依赖数据通信技术实现分布式系统的集成与扩展能力。
2.RabbitMQ
官网下载地址: https://www.rabbitmq.com/download.html

2.1 web管理界面介绍

connections:无论是生产者还是消费者,都必须与RabbitMQ建立连接后才能完成消息的生产和消费,在此处分歧查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
2.2 用户添加

上面的Tags选项,可以选择用户权限,权限选项如下:
超级管理员(administrator)
可以访问管理控制台并浏览各种信息的同时能够处理用户及策略(policy)
监控者(monitoring)
支持在线访问管理界面,并且能够访问rabbitmq节点的相关信息(运行状态、占用内存量、存储资源占用量等)
策略制定者(policymaker)
可访问管理控制台的同时,也支持policy配置功能的实现。然而,在上图中用红色方框标出的部分则无法查看相关信息
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
2.3 创建虚拟主机
虚拟主机:为确保各用户之间互不干扰地运行,在RabbitMQ中引入了虚拟主机(Virtual Hosts)这一概念。即相当于一个独立的访问通道。为此,RabbitMQ引入了虚拟主机(Virtual Hosts)的概念。每个通道都拥有独立的队列和交换机,并且由于采用了专用通道进行通信机制的不同配置特点,在实际应用中实现了互相不影响的功能特性。

2.4 用户与虚拟主机绑定
① 新建虚拟主机:


② 新建用户


③ 绑定虚拟主机



3.RabbitMQ的消息模型
3.1 AMQP协议模型

3.2 RabbitMQ支持的消息模型


3.2.1 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
3.2.2 “Hello World”模式

在上图的模型中,有以下概念:
- P:消息发送方/生成源
- C:接收方/终端
- queue:数据传输通道/通信管道
图中红色标记区域。类似于临时存储空间/缓冲区域。
生产者依次向队列投放信息,
消费者按顺序从队列提取数据。
1)消息生产者代码
public class Provider
{
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException
{
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("lucky");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("luckyUser");
connectionFactory.setPassword("123");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1:队列名称 如果队列中不存在会自动创建
//参数2:用来定义队列特性是否持久化 true持久化队列 false 不持久化 而非队列中消息
//参数3:exclusive 是否独占队列 true 独占队列
//参数4:autoDelete 是否在消费完成后自动删除队列 true 自动删除
//参数5: 附加参数
channel.queueDeclare("hello",true,false,false,null);
//发布消息
//参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN 消息也持久化 (需要保证生产者与消费者参数一致) 参数4:消息具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
}
2)消息消费者代码
public class Customer
{
public static void main(String[] args) throws IOException, TimeoutException
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("lucky");
connectionFactory.setUsername("luckyUser");
connectionFactory.setPassword("123");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
//参数1:消费哪个队列
//参数2:开始消息的自动确认机制
//参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
//最后一个参数:消息队列中取出的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("新消息"+new String(body));
}
});
//消费者一般不会关闭通道和连接,这时会一直监听
// channel.close();
// connection.close();
}
}
3)启动消费者和生产者,生产者收到消息

3.2.3 RabbitMQUtils
通过查看上述代码可以看出,
消息生产者与消费者之间的代码存在较多的重复性,
因此我们可以对其中的冗余代码进行优化复用:
public class RabbitMQUtils
{
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("lucky");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("luckyUser");
connectionFactory.setPassword("123");
}
public static Connection getConnection()
{
try {
return connectionFactory.newConnection();
}
catch (IOException e) {
e.printStackTrace();
}
catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChanel(Channel channel,Connection connection)
{
//关闭通道和连接
try {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
3.2.4 “work quene”模式
在工作队列(Work Queues)任务模型下,在消息处理较为耗时的情况下可能会导致生产端的消息流速度远超消耗端的速度。随着时间推移这种状况会导致大量未被及时处理的消息不断积累最终形成堆积状态。此时可以采用工作队列模式(Work Queues Mode):通过将多个消费者绑定到同一队列中实现对该队列内所有消息的一致性消费操作。一旦某条消息被消费者完成处理则立即从该队列中消失从而确保每个任务仅会被执行一次。

在上图的模型中,有以下概念:
- P: 发布者: 任务的所有人
- C1: 消费者-1: 接受并完成分配的任务;
- C2: 消费者-2: 接受并完成分配的任务;
1)消息生产者代码
public class Provider
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,("hello work queue "+i).getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
2)消息消费者代码
public class consumer1
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者1"+new String(body));
}
});
}
}
public class consumer2
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
//参数2 true消息自动确认 消费者自动向rabbitmq确认消息消费
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者2"+new String(body));
}
});
}
}


如上图所示,在默认情况下,RabbitMQ会按照消息队列的顺序将每个消息依次发送给接收到该消息的消费者。从整体来看,在这种配置下每个消费者平均可获得相同数量的消息数量。
channel.basicQos(1);//一次只接受一条未确认的消息
//参数2:关闭自动确认消息
channel.basicConsume("hello",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
}
});
为了使消费者1的行为被模仿,并且在消费能力上被削弱的情况下(即模拟消费能力不足的情形),观察结果是否会呈现均等的分配特征:
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1"+new String(body));
//手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});


由上图看出,消费者1因为消费速度慢导致获得了更少的消息。
3.2.5 “fanout”模式

在fanout模式下,消息发送流程如下:
- 支持存在多个消费者。
- 每个消费者均拥有独立的queue (队列)。
- 每个队列均需绑定于Exchange (交换机)。
- 消息一旦被发送至交换机后会触发其转发机制,在此过程中... 该机制会根据预先设定的绑定规则确定具体分配目标... 生产者则无法直接决定最终消息流向。
- 一旦消息被发送至交换机后... 该系统会自动将该消息转发给所有已绑定的队列。
- 队列中的消费者均能接收到来自不同源的消息流... 因此实现了一条消息能够被多个不同的消费方所利用。
1)消息生产者代码
public class Provider
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型 fanout广播类型
channel.exchangeDeclare("fanout_message","fanout");
//发送消息
channel.basicPublish("fanout_message","",null,"fanout type".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
2)消息消费者代码(多个消费者代码相同)
public class customer
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("fanout_message","fanout");
//l临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"fanout_message","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
}
});
}
}



由上图看出,三个消费者均受到信息。
3.2.6 “Routing”模式

在扇出模式下的一个信息会被所有订阅的队列接收;然而,在一些情况下我们需要让同一信息通过不同路由机制分配给相应的处理节点
在Routing模型下:
- 队列与交换机之间的注册(binding)并非随意进行, 而是需要明确配置一个Route Key.
- 发送方在将数据传输至Exchange时, 也应确保传递相应的Route Key.
- Exchange不再将数据直接分配给所有注册过的队列, 而是依据数据携带的具体Route Key进行匹配, 只有当队列中的Route Key与数据上的Route Key完全一致时, 才会完成接收.
流程如下:
源节点P配置并发布消息到Exchange。
交换机X负责接收来自源节点的消息,并将这些消息路由至与特定routinig key匹配的队列。
错误处理队C1仅处理从routinig key为error的消息。
普通日志处理队C2负责处理从routinig key为info、error或warning的消息。
1)消息生产者代码
public class Provide
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//交换机名称, direct:路由模式
channel.exchangeDeclare("provide_direct","direct");
//发送消息
String key = "error";//error信息
channel.basicPublish("provide_direct",key,null,("这是direct模型基于route key"+key).getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
2)消费者1代码
public class customer
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("provide_direct","direct");
//l临时队列
String queueName = channel.queueDeclare().getQueue();
//基于路由key绑定交换机和队列
channel.queueBind(queueName,"provide_direct","error");//接收error相关信息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("error"+new String(body));
}
});
}
}
3)消费者2代码
public class customer2
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("provide_direct","direct");
//l临时队列
String queueName = channel.queueDeclare().getQueue();
//基于路由key绑定交换机和队列 接收info、error和warning信息
channel.queueBind(queueName,"provide_direct","info");
channel.queueBind(queueName,"provide_direct","error");
channel.queueBind(queueName,"provide_direct","warning");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("info+error+warning"+new String(body));
}
});
}
}


从上述两张图表中可以看出, 因为两个消费者都拥有含有错误(error)的路由键(route key), 所以他们都接收到相关的错误消息. 为了进一步验证这一情况, 我们尝试发送info信息.



从上述两张图可以看出,在企业级应用中,默认情况下企业ID通常会被分配给同一个业务线下的不同子业务。
3.2.7 “Topics”模式

Topic类型的Exchange与Routing都允许队列在绑定Routing key时应用通配符功能。然而,在大多数情况下(除非特别说明),这种模型中的Routingkey通常由一个或多个单词组成,并通过点号分隔符连接起来(例如,在这种情况下表示为:lucky.dog)。
- 统配符
- 匹配不多不少恰好1个词
匹配一个或多个词
- 如:
lucky.# 匹配lucky.dog.cat或者 lucky.dog等
lucky.* 只能匹配 audit.dog
1)消息生产者代码
public class Provide
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_message","topic");
String routekey = "lucky.dog";
channel.basicPublish("topic_message",routekey,null,"topic动态路由模型".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
2)消费者代码
public class customer1
{
public static void main(String[] args) throws IOException
{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_message","topic");
String queue = channel.queueDeclare().getQueue();
//*是一个单词 #是多个
channel.queueBind(queue,"topic_message","lucky.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println(new String(body));
}
});
}
}

4.SpringBoot中使用RabbitMQ
4.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2 配置文件
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 127.0.0.1
port: 5672
username: luckyUser
password: 123
virtual-host: lucky
4.3 “Hello World”模式
生产者:
@SpringBootTest(classes = DemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ
{
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHello()
{
rabbitTemplate.convertAndSend("hello","hello world");
}
}
消费者:
@Component //默认是持久化 非独占 不自动删除的队列
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "true",autoDelete = "false"))//代表消费者 queuesToDeclare创建一个队列
public class HelloCustomer
{
@RabbitHandler
public void receive(String message)
{
System.out.println("message = "+message);
}
}
4.4 “work quene”模式
生产者:
//work
@Test
public void testWork()
{
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work 模型");
}
}
消费者:
@Component
public class WorkCustomer
{
//放到方法上代表这个方法会处理接收到的消息,不需要handler
//一个消费者
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive1(String message)
{
System.out.println(" work Message1"+message);
}
//放到方法上代表这个方法会处理接收到的消息,不需要handler
//一个消费者
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive2(String message)
{
System.out.println(" work Message2"+message);
}
}
4.5 “fanout”模式
生产者:
//fanout广播
@Test
public void testFanout()
{
rabbitTemplate.convertAndSend("logs","","fanout模型");
}
消费者:
@Component
public class FanoutCustomer
{
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//不指定名字则是临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive1(String message)
{
System.out.println("message1 fanout " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//不指定名字则是临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive2(String message)
{
System.out.println("message2 fanout " + message);
}
}
4.6 “Routing”模式
生产者:
//route 路由模式
@Test
public void testRoute()
{
rabbitTemplate.convertAndSend("directs","error","route模型");
}
消费者:
@Component
public class RouteCustomer
{
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//临时队列
exchange = @Exchange(value = "directs",type = "direct"),
key = {"info","error","warn"}
)
})
public void receive1(String message)
{
System.out.println("message11" + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//临时队列
exchange = @Exchange(value = "directs",type = "direct"),
key = {"error"}
)
})
public void receive2(String message)
{
System.out.println("message22" + message);
}
}
4.7 “Topics”模式
生产者:
//topic 动态路由模式(订阅模式)
@Test
public void testTopic()
{
rabbitTemplate.convertAndSend("topics","order","order路由消息");
}
消费者:
@Component
public class TopicCustomer
{
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"user.*"}
)
})
public void receive(String message)
{
System.out.println("message1"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name = "topics"),
key = {"order.#","produce.#","user.*"}
)
})
public void receive2(String message)
{
System.out.println("message2"+message);
}
}
5.RabbitMQ应用场景
- 异步处理是指在注册成功后自动发起发送短信或邮件操作的过程;例如,在MQ中可以通过异步机制实现这一功能。
- 应用解耦是指在不同业务系统之间建立独立的关系;例如,在消息队列架构下可以通过MQ实现库存与订单系统的解耦。
- 流量削峰是指在秒杀场景中通过限制高峰流量来保障系统稳定运行;例如,在高并发情况下采用削峰措施可有效防止系统崩溃。
