Advertisement

《RabbitMQ系列教程-第十二章-SpringBoot整合RabbitMQ高级特性》_springboot整合rabbitmq的高级特性(2)

阅读量:

最后






由于篇幅原因,就不多做展示了

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

复制代码
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    private void init() {
    rabbitTemplate.setConfirmCallback(this);
    }
    
    /\*\*
    
    
      
      
      
      
      
      
      
      
      
    
  • @param correlationData: 配置相关信息
  • @param ack: 交换机是否成功收到消息
  • @param cause: 失败原因
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
复制代码
    System.out.println("confirm executed...");
    
    if (ack) {
        System.out.println("success: " + cause);
    }else{
        System.out.println("error: " + cause);
    }
    }
    
    
      
      
      
      
      
      
      
      
    

}

复制代码
    ##### 12.1.1.4 测试代码:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class ProducerTest {

复制代码
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    
    /\*\*
    
    
      
      
      
      
      
    
  • 确认模式:
    */
    @Test
    public void testConfirm() {
    // 发送消息
    rabbitTemplate.convertAndSend(“test_exchange_confirm2”, “confirm”, “message confirm…”);
    }
    }
复制代码
    #### 12.1.2 回退模式
    
    
    **概念:当Producer发送消息之后,队列无法接受到消息时,触发回调函数**
    
    
    回顾Spring环境下如何开启回退模式:
    
    
    1、在ConnectionFactory中开启回退模式`publisher-returns="true"`
    
    
    2、开启消息回退(设置rabbitTempalte中的`setMandatory(true)`)
    
    
    2、确认模式在`RabbitTemplate`对象的`setConfirmCallback`方法中设置一个回调函数
    
    
    ##### 12.1.2.1 application.yml:
    
    
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

spring:
rabbitmq:
publisher-returns: true # 开启回退模式

复制代码
    ##### 12.1.2.2 配置类:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**

  • 测试消息确认机制中的确认模式
    */
    @Configuration
    public class Config_02_ReturnCallback implements RabbitTemplate.ReturnCallback {
复制代码
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    private void init() {
    // 开启消息回退
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(this);
    }
    
    /\*\*
    
    
      
      
      
      
      
      
      
      
      
      
      
    
  • @param message 消息对象
  • @param replyCode 错误码
  • @param replyText 错误信息
  • @param exchange 交换机
  • @param routingKey 路由键
    */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println(“return executed…”);
复制代码
    System.out.println(message);
    System.out.println(replyCode);
    System.out.println(replyText);
    System.out.println(exchange);
    System.out.println(routingKey);
    }
    
    
      
      
      
      
      
      
    

}

复制代码
    ##### 12.1.2.3 测试类:
    
    
    
    
    
      
      
      
      
      
    

/**

  • 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
    */
    @Test
    public void testReturn() {
    // 发送消息
    rabbitTemplate.convertAndSend(“test_exchange_confirm”, “confirm”, “message confirm…”);
    }
复制代码
    #### 12.1.3 事务模式
    
    
    回顾Spring环境下如何开启事务模式:
    
    
    1、事务模式不能与确认模式和回退模式共存(删除确认模式和回退模式的配置)
    
    
    2、开启rabbitmq对事务的支持`setChannelTransacted(true)`、开启消息回退`setMandatory(true)`
    
    
    3、注入`RabbitTransactionManager`事务管理器
    
    
    ##### 12.1.3.1 application.yml:
    
    
    删除如下配置:
    
    
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

spring:
rabbitmq:
#publisher-confirms: true # 开启消息确认模式
#publisher-returns: true # 开启回退模式

复制代码
    ##### 12.1.3.2 配置类:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**

  • 测试消息确认机制中的事务
    */
    @Configuration
    public class Config_03_Transaction {
复制代码
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
    // 开启rabbitmq对事物的支持
    rabbitTemplate.setChannelTransacted(true);
    
    // 开启消息回退
    rabbitTemplate.setMandatory(true);
    }
    
    /\*\*
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
    
  • 注入RabbitMQ事务管理器
  • @param
  • @return
    */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager() {
复制代码
    RabbitTransactionManager manager = new RabbitTransactionManager();
    ConnectionFactory factory = rabbitTemplate.getConnectionFactory();
    manager.setConnectionFactory(factory);
    
    return manager;
    }
    
    
      
      
      
      
      
      
    

}

复制代码
    ##### 12.1.3.3 测试代码:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

复制代码
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/hello/{flag}")
    @Transactional
    public String hello(@PathVariable Integer flag) {
    rabbitTemplate.convertAndSend("test\_exchange\_confirm", "confirm", "message confirm.....");
    
    if (flag == 0) {
        int i = 1 / 0;
    }
    rabbitTemplate.convertAndSend("test\_exchange\_confirm", "confirm", "message confirm.....");
    return "ok";
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

}

复制代码
    #### 12.1.4 Conusmer Ack
    
    
    回顾Spring环境下如何开启事务模式:
    
    
    * 在监听容器中指定acknowledge参数,none(自动签收)、manual(手动签收)、auto(rabbitmq来决定签收)
    
    
    搭建Consumer端工程:参考前面的SpringBoot整合RabbitMQ
    
    
    ##### 12.1.4.1 application.yml:
    
    
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动签收

复制代码
    ##### 12.1.4.2 配置类:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}

复制代码
    ##### 12.1.4.3 监听类:
    
    
    
    
    
      
      
      
      
      
    

package com.lscl.rabbitmq.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class AckListener {

复制代码
    @RabbitListener(queues = "test\_queue\_confirm")
    public void test\_queue\_confirm(Message message, Channel channel) throws Exception {
    
    try {
        //1.获取消息
        System.out.println(new String(message.getBody()));
    
        //2. 处理业务逻辑
        System.out.println("处理业务逻辑...");
    
        int i = 1 / 0;
    
        // 签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    } catch (Exception e) {
        e.printStackTrace();
    
        // 拒绝签收
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
    }
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

}

复制代码
    ### 12.2 TTL队列
    
    
    什么是TTL队列?
    
    
    * 带有过期时间的队列,当过期时间到达后消息如果没有被消费,那么自动丢弃
    * 当队列和消息同时设置有过期时间时,以最先过期的单位时间为准
    * 在RabbitMQ中并不是轮询方式去判断消息是否过期,而是只判断在最顶部的消息,因此消息如果不是在最顶部技术到达了过期时间也不会被移除队列;
    
    
    回顾Spring环境下如何配置TTL队列:
    
    
    * 1)设置队列的`x-message-ttl`参数,单位:毫秒
    
    
    #### 12.2.1 配置类:
    
    
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**

  • 测试TTL
    */
    @Configuration
    public class Config_04_TTL {
复制代码
    @Bean("testQueueTtl")
    public Queue testQueueTtl() {
    Map<String, Object> param = new HashMap<>();
    param.put("x-message-ttl", 5000);
    
    return QueueBuilder.durable("test\_queue\_ttl").withArguments(param).build();
    }
    
    
      
      
      
      
      
      
      
    

}

复制代码
    #### 12.2.2 测试类:
    
    
    
    
    
      
      
      
      
      
    

/**

  • TTL:过期时间
    */
    @Test
    public void testTtl() {

// rabbitTemplate.convertAndSend(“test_queue_ttl”, “ttl 队列统一过期…”);

复制代码
    MessageProperties properties=new MessageProperties();
    properties.setExpiration("2000");
    Message message=new Message("ttl 单独消息过期".getBytes(),properties);
    
    rabbitTemplate.convertAndSend("test\_queue\_ttl", message);
    
    
      
      
      
      
      
    

}

复制代码
    ### 12.3 死信队列
    
    
    回顾什么是死信队列?
    
    
    * 1)消息被拒绝签收(`channel.basicNack()、channel.basicReject()`)
    * 2)消息达到过期时间没有被消费
    * 3)消息超过了队列的最大长度
    
    
    Spring环境下如何配置死信队列?
    
    
    * 1)声明正常的队列,并且在队列参数中设置死信交换机(消息成为死信后再次被利用起来只能绑定交换机,不能直接绑定Queue)
    * 2)声明死信交换机
    * 3)声明死信队列
    * 4)死信队列绑定死信交换机
    
    
    #### 12.3.1 配置类
    
    
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**

  • 测试死信队列
    */
    @Configuration
    public class Config_05_Dlx {
复制代码
    // 声明正常的队列,设置死信交换机参数
    @Bean("testQueueDlx")
    public Queue testQueueDlx() {
    
    Map<String,Object> params=new HashMap();
    // 设置死信交换机
    params.put("x-dead-letter-exchange","exchange\_dlx");
    
    // 转发给死信交换机的routingKey
    params.put("x-dead-letter-routing-key","dlx");
    
    // 队列的过期时间
    
    
      
      
      
      
      
      
      
      
      
      
      
      
    

// params.put(“x-message-ttl”,5000);

复制代码
    // 设置队列的最大长度限制
    
    
      
    

// params.put(“x-max-length”,10);

复制代码
    return QueueBuilder.durable("test\_queue\_dlx").withArguments(params).build();
    }
    
    // 声明死信队列
    @Bean("queue\_dlx")
    public Queue queueDlx() {
    return QueueBuilder.durable("queue\_dlx").build();
    }
    
    
    // 声明死信交换机
    @Bean("test\_exchange\_dlx")
    public Exchange testExchangeDlx() {
    return ExchangeBuilder.directExchange("exchange\_dlx").durable(true).build();
    }
    
    // 死信交换机绑定死信队列
    @Bean
    public Binding binding(){
    Queue queue = queueDlx();
    Exchange exchange = testExchangeDlx();
    
    return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

}

复制代码
    #### 12.3.2 测试类:
    
    
    
    
    
      
      
      
      
      
    

/**

  • 发送测试死信消息:
    1. 过期时间
    1. 长度限制
    1. 消息拒收
      */
      @Test
      public void testDlx() {
复制代码
    MessageProperties properties = new MessageProperties();
    properties.setExpiration("5000");
    Message message = new Message("我是死信队列哦...".getBytes(), properties);
    
    //1. 测试过期时间,死信消息
    rabbitTemplate.convertAndSend("test\_queue\_dlx", message);
    
    //2. 测试长度限制后,消息死信
    
    
      
      
      
      
      
      
      
      
    

/* for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}*/

复制代码
    //3. 测试消息拒收
    
    
      
    

// rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}

惊喜

最后还准备了一套上面资料对应的面试题(有答案哦)和面试时的高频面试算法题(如果面试准备时间不够,那么集中把这些算法题做完即可,命中率高达85%+)
image.png
image.png

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}*/

复制代码
    //3. 测试消息拒收
    
    
      
    

// rabbitTemplate.convertAndSend(“test_queue_dlx”,“我是死信队列哦…”+i);
}

惊喜

最后还准备了一套上面资料对应的面试题(有答案哦)和面试时的高频面试算法题(如果面试准备时间不够,那么集中把这些算法题做完即可,命中率高达85%+)

[外链图片转存中…(img-Wvzo58JX-1715474088832)]

[外链图片转存中…(img-Ds6aGbUn-1715474088833)]

本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

需要这份系统化的资料的朋友,可以点击这里获取

全部评论 (0)

还没有任何评论哟~