Rabbitmq消息可靠性
在 Rabbitmq 环境下, 通过交换机制将 routingkey 传递至 queues, 再由消费者节点处理信息, 整个流程中存在多个环节可能导致因服务器故障而产生的网络中断问题. 如何确保这些信息传输的可靠性至关重要.
- exchange、queue和message具有持久化特性。
- 确认流程与返回机制。
- 自动acks确保消息被处理。
- 当消息堆积时, 归入死线队列。
详解和实战:
1、首先必须设置exchange和queue的属性durable为true:
@Bean
public TopicExchange topicExchange() {
TopicExchange exchange = ExchangeBuilder.topicExchange(TOPIC_PLACE_ORDER_EXCHANGE).durable(true).build();
//rabbitAdmin.declareExchange(exchange);
return exchange;
}
@Bean
public Queue userQueue() {
return QueueBuilder.durable(USER_QUEUE).build();
}
2、消息一定要送达exchange,开启confirm机制:
如何确保消息能够从生产者成功发送至exchange,请参阅中的《Publisher Confirm通讯模式的原生实现》,本节将为您介绍如何在Springboot环境中集成该通讯模式的具体方案和技术细节
(1)配置文件开启消息确认:
spring:
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
application:
name: example
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启消息确认
从源码中可以看出,消息确认类型有三种:
SIMPLE:代表单条或批量消息同步确认
CORRELATED: 与回调函数一起使用,实现异步消息确认
NONE:不确认

(2)编写回调函数:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(!ack){
System.out.println("====消息发送失败,重试!====");
}
});
在源码中查看setConfirmCallback函数时发现该函数必须传递一个ConfirmCallback对象实例,并建议采用lambda表达式来完成实现

开启return机制:消息从exchange通过routingkey被路由至queue时也可能会导致消息丢失[Rabbitmq]通过采用Return机制确保了消息能够从exchange正确地路由至queue:
(1)编写配置文件,开启return机制:
spring:
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
application:
name: example
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启消息确认
publisher-returns: on # 可以写on/off,或者true/false
(2)编写回调函数:
//只要调用了该方法,说明exchange在路由消息到queue时,失败了
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("===消息发送失败,进行补救====");
});
3、服务器重启,queue里面的消息不会丢失
特别注意,在channel.queueDeclear()方法中使用的durable参数的作用是什么?它指的是当RabbitMQ服务重启时整个队列不会丢失数据,并非是单个队列中的消息不会丢失的问题。我们特别关注的是队列中的消息能够安全传输至目的地而不发生丢失问题。
请详细说明如何配置发送的消息持久化?请问您能否提供进一步的操作步骤?请说明如何使用MessagePostProcessor实例来传递消息持久化的标识?
//配置消息持久化标识
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
};
//只会匹配第二个队列,#是通配符,可以有多个.
rabbitTemplate.convertAndSend(SpringbootRabbitmqConstant.TOPIC_EXCHANGE, "111.222.aaa.bbb.ccc", msg, messagePostProcessor);
4、消费者获取队列消息不会丢失(手动ack)
(1)编写配置文件
spring:
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
application:
name: example
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated #开启消息确认
publisher-returns: on # 可以写on/off,或者true/false
listener:
direct:
acknowledge-mode: manual # 设置手动ack,none表示不ack,auto表示自动ack
(2)在消费者@RabbitListener中,执行完成后手动ack:
@RabbitListener(queues = SpringbootRabbitmqConstant.FANOUT_QUEUE + "1", ackMode = "AUTO")
public void recoverMessage1(String msg, Message message, Channel channel) throws IOException {
long deliveryTag = 0;
try {
byte[] body = message.getBody();
msg = new String(body, StandardCharsets.UTF_8);
deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("recoverMessage1============" + msg + "deliveryTag:" + deliveryTag);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
//手动ack
channel.basicAck(deliveryTag, false);
}
}
5、死信队列:
当消息队列的大小达到预定上限时,后续发送的消息不再被接收;从而错过了这些信息.
引入死信队列,将消息发送到死信队列中,保证消息不丢失;
构建Rabbitmq的高可用架构,并部署镜像队列作为数据备份机制。只有当所有节点同时故障时才可能导致消息丢失。理论上无法实现完全无故障的状态, 但相较于未配置镜像队列的情况, 其可靠性较之高出显著比例。在大多数生产环境中建议采用此方案。
