《RabbitMQ系列教程-第十二章-SpringBoot整合RabbitMQ高级特性》_springboot整合rabbitmq的高级特性(2)
最后






由于篇幅原因,就不多做展示了
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
}
/\*\*
- @param correlationData: 配置相关信息
- @param ack: 交换机是否成功收到消息
- @param cause: 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm executed...");
if (ack) {
System.out.println("success: " + cause);
}else{
System.out.println("error: " + cause);
}
}
}
##### 12.1.1.4 测试代码:
package com.lscl.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/\*\*
- 确认模式:
*/
@Test
public void testConfirm() {
// 发送消息
rabbitTemplate.convertAndSend(“test_exchange_confirm2”, “confirm”, “message confirm…”);
}
}
#### 12.1.2 回退模式
**概念:当Producer发送消息之后,队列无法接受到消息时,触发回调函数**
回顾Spring环境下如何开启回退模式:
1、在ConnectionFactory中开启回退模式`publisher-returns="true"`
2、开启消息回退(设置rabbitTempalte中的`setMandatory(true)`)
2、确认模式在`RabbitTemplate`对象的`setConfirmCallback`方法中设置一个回调函数
##### 12.1.2.1 application.yml:
spring:
rabbitmq:
publisher-returns: true # 开启回退模式
##### 12.1.2.2 配置类:
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
- 测试消息确认机制中的确认模式
*/
@Configuration
public class Config_02_ReturnCallback implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// 开启消息回退
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
/\*\*
- @param message 消息对象
- @param replyCode 错误码
- @param replyText 错误信息
- @param exchange 交换机
- @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(“return executed…”);
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
}
##### 12.1.2.3 测试类:
/**
- 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
*/
@Test
public void testReturn() {
// 发送消息
rabbitTemplate.convertAndSend(“test_exchange_confirm”, “confirm”, “message confirm…”);
}
#### 12.1.3 事务模式
回顾Spring环境下如何开启事务模式:
1、事务模式不能与确认模式和回退模式共存(删除确认模式和回退模式的配置)
2、开启rabbitmq对事务的支持`setChannelTransacted(true)`、开启消息回退`setMandatory(true)`
3、注入`RabbitTransactionManager`事务管理器
##### 12.1.3.1 application.yml:
删除如下配置:
spring:
rabbitmq:
#publisher-confirms: true # 开启消息确认模式
#publisher-returns: true # 开启回退模式
##### 12.1.3.2 配置类:
package com.lscl.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
- 测试消息确认机制中的事务
*/
@Configuration
public class Config_03_Transaction {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 开启rabbitmq对事物的支持
rabbitTemplate.setChannelTransacted(true);
// 开启消息回退
rabbitTemplate.setMandatory(true);
}
/\*\*
- 注入RabbitMQ事务管理器
- @param
- @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
RabbitTransactionManager manager = new RabbitTransactionManager();
ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
manager.setConnectionFactory(factory);
return manager;
}
}
##### 12.1.3.3 测试代码:
package com.lscl.rabbitmq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/hello/{flag}")
@Transactional
public String hello(@PathVariable Integer flag) {
rabbitTemplate.convertAndSend("test\_exchange\_confirm", "confirm", "message confirm.....");
if (flag == 0) {
int i = 1 / 0;
}
rabbitTemplate.convertAndSend("test\_exchange\_confirm", "confirm", "message confirm.....");
return "ok";
}
}
#### 12.1.4 Conusmer Ack
回顾Spring环境下如何开启事务模式:
* 在监听容器中指定acknowledge参数,none(自动签收)、manual(手动签收)、auto(rabbitmq来决定签收)
搭建Consumer端工程:参考前面的SpringBoot整合RabbitMQ
##### 12.1.4.1 application.yml:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动签收
##### 12.1.4.2 配置类:
package com.lscl.rabbitmq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
##### 12.1.4.3 监听类:
package com.lscl.rabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class AckListener {
@RabbitListener(queues = "test\_queue\_confirm")
public void test\_queue\_confirm(Message message, Channel channel) throws Exception {
try {
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 1 / 0;
// 签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
// 拒绝签收
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
### 12.2 TTL队列
什么是TTL队列?
* 带有过期时间的队列,当过期时间到达后消息如果没有被消费,那么自动丢弃
* 当队列和消息同时设置有过期时间时,以最先过期的单位时间为准
* 在RabbitMQ中并不是轮询方式去判断消息是否过期,而是只判断在最顶部的消息,因此消息如果不是在最顶部技术到达了过期时间也不会被移除队列;
回顾Spring环境下如何配置TTL队列:
* 1)设置队列的`x-message-ttl`参数,单位:毫秒
#### 12.2.1 配置类:
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
- 测试TTL
*/
@Configuration
public class Config_04_TTL {
@Bean("testQueueTtl")
public Queue testQueueTtl() {
Map<String, Object> param = new HashMap<>();
param.put("x-message-ttl", 5000);
return QueueBuilder.durable("test\_queue\_ttl").withArguments(param).build();
}
}
#### 12.2.2 测试类:
/**
- TTL:过期时间
*/
@Test
public void testTtl() {
// rabbitTemplate.convertAndSend(“test_queue_ttl”, “ttl 队列统一过期…”);
MessageProperties properties=new MessageProperties();
properties.setExpiration("2000");
Message message=new Message("ttl 单独消息过期".getBytes(),properties);
rabbitTemplate.convertAndSend("test\_queue\_ttl", message);
}
### 12.3 死信队列
回顾什么是死信队列?
* 1)消息被拒绝签收(`channel.basicNack()、channel.basicReject()`)
* 2)消息达到过期时间没有被消费
* 3)消息超过了队列的最大长度
Spring环境下如何配置死信队列?
* 1)声明正常的队列,并且在队列参数中设置死信交换机(消息成为死信后再次被利用起来只能绑定交换机,不能直接绑定Queue)
* 2)声明死信交换机
* 3)声明死信队列
* 4)死信队列绑定死信交换机
#### 12.3.1 配置类
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
- 测试死信队列
*/
@Configuration
public class Config_05_Dlx {
// 声明正常的队列,设置死信交换机参数
@Bean("testQueueDlx")
public Queue testQueueDlx() {
Map<String,Object> params=new HashMap();
// 设置死信交换机
params.put("x-dead-letter-exchange","exchange\_dlx");
// 转发给死信交换机的routingKey
params.put("x-dead-letter-routing-key","dlx");
// 队列的过期时间
// params.put(“x-message-ttl”,5000);
// 设置队列的最大长度限制
// params.put(“x-max-length”,10);
return QueueBuilder.durable("test\_queue\_dlx").withArguments(params).build();
}
// 声明死信队列
@Bean("queue\_dlx")
public Queue queueDlx() {
return QueueBuilder.durable("queue\_dlx").build();
}
// 声明死信交换机
@Bean("test\_exchange\_dlx")
public Exchange testExchangeDlx() {
return ExchangeBuilder.directExchange("exchange\_dlx").durable(true).build();
}
// 死信交换机绑定死信队列
@Bean
public Binding binding(){
Queue queue = queueDlx();
Exchange exchange = testExchangeDlx();
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
}
#### 12.3.2 测试类:
/**
- 发送测试死信消息:
-
- 过期时间
-
- 长度限制
-
- 消息拒收
*/
@Test
public void testDlx() {
- 消息拒收
MessageProperties properties = new MessageProperties();
properties.setExpiration("5000");
Message message = new Message("我是死信队列哦...".getBytes(), properties);
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend("test\_queue\_dlx", message);
//2. 测试长度限制后,消息死信
/* for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}*/
//3. 测试消息拒收
// rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}
惊喜
最后还准备了一套上面资料对应的面试题(有答案哦)和面试时的高频面试算法题(如果面试准备时间不够,那么集中把这些算法题做完即可,命中率高达85%+)


for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}*/
//3. 测试消息拒收
// rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}
惊喜
最后还准备了一套上面资料对应的面试题(有答案哦)和面试时的高频面试算法题(如果面试准备时间不够,那么集中把这些算法题做完即可,命中率高达85%+)
[外链图片转存中…(img-Wvzo58JX-1715474088832)]
[外链图片转存中…(img-Ds6aGbUn-1715474088833)]
