RabbitMQ相关知识点梳理
目录
- 一、RabbitMQ入门指南
- 二、集群高可用架构设计
- 三、数据持久化策略及内存管理
- 四、消息可靠传输机制
一、RabbitMQ入门简介
RabbitMQ是一个开源的AMQP实现,服务器端使用Erlang语言编写,支持多种客户端。它用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。以下是RabbitMQ的安装过程,文档可百度云自取(当然也可以用Docker版安装): 链接: rabbitmq单机到集群完整搭建 提取码: r6a2
RabbitMQ会绑定一些端口,分别如下:
(1)4396 :Erlang的端口/结点映射程序,用来跟踪节点名称监听地址,在集群中类似DNS的作用;
(2)5672、5671 :AMQP客户端端口,没有使用SSL和使用SSL的端口;
(3)25672 :用于RabbitMQ节点间和Cli工具通信,配合4396使用;
(4)15672 :HTTP_API端口,管理员用户才能访问,是管理RabbitMQ的可视化界面,需要启用management插件;
(5)61613、61614 :当STOMP插件启用时打开,作为STOMP客户端端口(根据是否使用TLS选择);
(6)1883、8883 :当MQTT插件启用时打开,作为MQTT客户端端口;
(7)15674 :基于WebSocket的STOMP客户端端口;
(8)15675 :基于WebSocket的MQTT客户端。
在RabbitMQ中还有角色分类的概念,分别如下:
(1)none :不能访问management plugin;
(2)management :用户可以通过AMQP做的任何事并且加上:
<1>列出自己可以通过AMQP登入的virtualHosts;
<2>查看自己的virtualHosts中的queues、exchanges和bindings;
<3>查看和关闭自己的channels和connections;
<4>查看有关自己的virtualHosts和“全局”的统计信息,包含其他用户在这些virtualHosts中的活动;
(3)policymaker :management可以做的任何事并且加上查看、创建和删除自己的virtualHosts所处的policies和parameter;
(4)monitoring :management可以做的任何事并且加上:
<1>列出所有virtualHosts包括他们不能登陆的virtualHosts
<2>查看其他用户的channels和connections;
<3>查看节点级别的数据和clustering和memery的使用情况;
<4>查看真正的关于所有virtualHosts的全局统计信息。
(5)administrator :policymaker和management可以做的任何事并且加上:
<1>创建和删除virtualHosts;
<2>查看、创建和删除user;
<3>查看、创建和删除permission;
<4>关闭其他用户的connections。
安装完RabbitMQ后登陆Web管理端界面如下:

RabbitMQ架构图如下:

以下是RabbitMQ结合Java代码的使用示例:
(1)生产者 :
/** * Topic--生产者
* <p>
* 生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 路由关系如下:com.# --> queue-1 *.order.* ---> queue-2
// 消息内容
String message = "Hello A";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
// 消息内容
message = "Hello B";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
// 消息内容
message = "Hello C";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
(2)消费者 :
/** * 路由--消费者
* <p>
* 消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息
*/
public class Consumer {
private static Runnable receive = new Runnable() {
@Override
public void run() {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
final String queueName = Thread.currentThread().getName();
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 定义消息接收回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + " 开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "queue-1").start();
new Thread(receive, "queue-2").start();
new Thread(receive, "queue-3").start();
}
}
二、RabbitMQ集群高可用方案
基于上述分析

该同步模式旨在协调各RabbitMQ节点间的元信息共享。
真实消息仍保留在原始的RabbitMQ节点中。
元数据构成如下:
- 队列元数据:记录队列名称及其属性;
- 交换器元数据:记录交换器名称及其属性;
- 约束关系元数据:描述队列与交换器之间的约束关系;
- vhost 元数据:实现vhost内部队列、交换器及约束关系的命名空间划分与安全属性管理。
在此配置下,消费者即使连接至RabbitMQ02节点也能够确定消息所处集群中的具体位置。
然而此模式虽然提升了资源利用效率但存在潜在风险一旦某集群成员失效则所有相关数据将面临丢失风险。
(2)Cluster镜像模式:

这种模式基于RabbitMQ节点间采用策略同步所有真实信息


三、RabbitMQ持久化机制与内存控制
RabbitMQ的持久化机制包括队列持久化、消息存储以及交换机式的存储方案等不同方式。值得注意的是无论选择何种存储策略,在磁盘上的数据都是可恢复的;然而,在系统重启后能否恢复则是关键区别之一。(1)具体而言:当选择进行数据保护时系统不仅会将信息备份到本地存储设备中还会将其同步至远程服务器上以确保数据的安全性;这与未进行保护的数据仅存在本地即可不同。
如图所示:

(2)临时消息:
非持久化消息不会被存储于磁盘上,在内存空间不足时才进行存储操作;然而,在系统重启后可能导致数据丢失的情况(如图所示):

持久化的Java示例如下:
(1)生产者 :
/** * 持久化示例
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 定义一个持久化的,direct类型交换器
channel.exchangeDeclare("routing_test", "direct", true);
// 内存、磁盘预警时用
System.out.println("按回车继续");
System.in.read();
// 消息内容
String message = "Hello A";
// 发送持久化消息到routing_test交换器上
channel.basicPublish("routing_test", "c1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
// 消息内容
message = "Hello B";
// 发送持久化消息到routing_test交换器上
channel.basicPublish("routing_test", "c2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
// 内存、池畔预警时用
System.out.println("按回车结束");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
(2)消费者 :
/** * 持久化示例
*/
public class Consumer {
private static Runnable receive = new Runnable() {
public void run() {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
final String clientName = Thread.currentThread().getName();
String queueName = "routing_test_queue";
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者-" + clientName);
// 4、从链接中创建通道
channel = connection.createChannel();
// 定义一个持久化的,direct类型交换器
channel.exchangeDeclare("routing_test", "direct", true);
/** * 声明(创建)持久化队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
* * queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
* 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
* 一般在队列和交换器绑定时使用
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare(queueName, true, false, false, null);
// 将队列和交换器绑定,第三个参数 routingKey是关键,通过此路由键决定接收谁的消息
channel.queueBind(queueName, "routing_test", clientName);
// 定义消息接收回调对象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(clientName + " 开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "c1").start();
new Thread(receive, "c2").start();
}
}
RabbitMQ提供了一系列内存管理参数以优化系统性能
(1)最大可用内存大小:一旦内存耗尽上限,则消息队列将暂停新消息的接收;其默认设置为40%的可用内存(即当系统内存超过40%时会发出警告并立即停止接收新消息)。
(2)动态内存管理切换门限比例:当前系统中积压的消息数量达到该比例后将触发磁盘换页操作;根据默认设置,在当前内存使用量达到最大可用内存大小的一半时会执行磁盘换页策略以释放存储空间(具体而言,默认门限设为40%,即当系统内存在超过40%未释放的消息时将执行磁盘换页操作)。
(3)最低可接受磁盘空间限制:若当前磁盘剩余空间低于该设定,则队列将暂停进一步操作;其默认配置规定当剩余可用空间降至50MB以下时会暂停新连接的创建并终止后续的动态内存管理功能
四、RabbitMQ消息可靠性
RabbitMQ的消息可靠性可以从以下几个方面进行评估:
(1)发送可靠性方面:确保生产者能够正确地将消息发送至 RabbitMQ(确认机制);
(2)存储可靠性方面:RabbitMQ能够有效地持久化消息以防止丢失(如前所述);
(3)消费可靠性方面:确保消费者能够正确接收并消耗掉消息(通过 ACK 机制)。若消费者未能正确接收,则有以下三种处理方案:
<1> RabbitMQ不会保存该消息直接将其丢弃;
<2> RabbitMQ会将该消息存储至普通队列中作为死信;
<3> RabbitMQ会将该消息重新重传至队列中以便后续处理。
一般而言,消息可靠性可划分为如下三个等级:
(1)最多一次:允许的消息最多传输一次,若发生丢失则不会重复传输;
(2)至少一次:保证消息至少传输一次,但若发生丢失则可能会出现重复传输;
(3)恰好一次:确保消息只传输一次,既不会丢失也不会出现重复传输的情况。
/** * 生产者确认机制
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 进入confirm模式, 每次发送消息,rabbtiqm处理之后会返回一个对应的回执消息
AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
// 增加监听器,监听生产者是否已经把消息发送到RabbitMQ
ArrayList<String> queues = new ArrayList<>();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// deliveryTag 同一个channel中此条消息的编号 。
// 业务..
System.out.println("受理成功 " + queues.get((int) deliveryTag) + " " + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 失败重发
// queues.get((int) deliveryTag)
System.out.println("受理失败 " + deliveryTag);
}
});
// 定义fanout类型的交换器
channel.exchangeDeclare("ps_test", "fanout");
for (int i = 0; i < 10; i++) {
// 消息内容
String message = "Hello Confirm " + i;
queues.add(message);
// 发送消息到ps_test交换器上
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
channel.basicPublish("ps_test", "", basicProperties, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
}
// 等待20秒
Thread.sleep(20 * 1000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
(2)消费者:
/** * 消费者确认机制
*/
public class Consumer {
private static Runnable receive = new Runnable() {
public void run() {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.70.128");
factory.setUsername("guest");
factory.setPassword("123456");
Connection connection = null;
Channel channel = null;
final String clientName = Thread.currentThread().getName();
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// ###死信队列相关:专门用来存储 出错 出异常的数据
channel = connection.createChannel();
// 1、 创建一个exchange
channel.exchangeDeclare("dlq_exchange", "fanout");
// 2、 创建一个queue,和exchange绑定起来
channel.queueDeclare("dlq_queue1", false, false, false, null);
channel.queueBind("dlq_queue1", "dlq_exchange", "");
// ######死信队列结束
// 4、从链接中创建通道
channel = connection.createChannel();
// 代码定义交换器
channel.exchangeDeclare("ps_test", "fanout");
// 还可以定义一个临时队列,连接关闭后会自动删除,此队列是一个排他队列
String queueName = "queue1";
// 队列中有死信产生时,消息会转发到交换器 dlq_exchange。
Map<String, Object> args = new HashMap<String, Object>();
//当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "dlq_exchange");//键值是固定的
channel.queueDeclare(queueName, false, false, false, args);//把死信队列参数放进去,当消费者未正常消费时放入这个死信队列中去
// 将队列和交换器绑定
channel.queueBind(queueName, "ps_test", "");
// 监听队列
Channel finalChannel = channel;
channel.basicConsume(queueName, false, "消费者-手动回执",
new DefaultConsumer(finalChannel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
try {
System.out.println("收到消息: " + new String(body));
// TODO 业务处理
long deliveryTag = envelope.getDeliveryTag();
// 模拟业务处理耗时
Thread.sleep(1000L);
// 正常消费
finalChannel.basicAck(deliveryTag, false);
// 异常消费
// finalChannel.basicNack(envelope.getDeliveryTag(), false, false);
} catch (InterruptedException e) {
// 异常消费, requeue参数 true重发,false不重发(丢弃或者移到DLQ死信队列)
// finalChannel.basicNack(envelope.getDeliveryTag(), false, false);
e.printStackTrace();
}
}
});
System.out.println(clientName + " 开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "c1").start();
}
}
注
注
注
