Advertisement

SpringBoot+RabbitMQ实现延迟队列

阅读量:

本文目录

      • 第一部分:延迟队列的功能及其实现机制
  • 第二部分:基于SpringBoot框架与RabbitMQ技术实现的消息延迟传输机制
  • 在项目pom文件中,请依次添加以下依赖项:
    • 在生产者工程模块中,设计并实现交换机与 RabbitMQ 队列之间的绑定关系配置文件
    • 开发测试用例时,向系统中的延迟队列发送测试消息(确保 RoutingKey与系统配置匹配)
    • 登录 RabbitMQ 管理界面,查看当前系统的运行状态及消息传输情况
    • 设计消费者逻辑时,从 RabbitMQ 的死信(持久化)队列中读取消息并进行处理

一、什么是延迟队列?

首先来了解什么是延迟队列?它指的是当消息进入队列后不会立即被处理,只有等到指定的时间点才会开始处理这些消息。

在这里插入图片描述

常见需求: 1、在网上商城下单后,30分钟未支付,取消订单,回滚库存;

​ 2、新用户注册成功7天后,发送短信.

实现方法: 1、定时器 2、延迟队列

然而RabbitMQ并未内置延迟队列功能;不过可以通过将TTL(时延)机制与DLX(死信)队列特性相结合来达到类似效果。

在这里插入图片描述

二、SpringBoot+RabbitMQ实现延迟队列

在pom中导入以下依赖
复制代码
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
在生产者工程中,创建一个配置类,声明交换机与队列的绑定关系
复制代码
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class TTLDLXConfig {
    
    //死信交换机的名称;
    public static final String DLX_EXCHANGE_NAME = "dlx_boot_exchange";
    //死信队列名称;
    public static final String DLX_QUEUE_NAME = "dlx_boot_queue";
    
    //普通交换机的名称;
    public static final String TEST_EXCHANGE_NAME = "test_boot_exchange";
    //普通队列的名称;
    public static final String TEST_QUEUE_NAME = "test_boot_queue";
    
    
    /** * 声明死信交换机,我们会定义多个交换机,
     * 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
     * durablie:持久化
     */
    
    @Bean("dlxExchange")
    public Exchange dlxExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
    
    }
    
    //声明死信队列;
    @Bean("dlxQueue")
    public Queue dlxQueue(){
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    
    }
    
    
    //绑定死信交换机和死信队列;
    @Bean
    public Binding bindDLXExchangeQueue(@Qualifier("dlxQueue") Queue queue,
                                           @Qualifier("dlxExchange") Exchange exchange){
    
        return BindingBuilder.bind(queue).to(exchange).with("text.#").noargs();
    
    }
    
    
    //声明普通交换机
    @Bean("testExchange")
    public Exchange testExchange(){
        return ExchangeBuilder.topicExchange(TEST_EXCHANGE_NAME).durable(true).build();
    
    }
    
    
    /** * 声明延迟队列;
     * ttl:设置消息过期的时间,单位为毫秒,方便测试效果,这里设置10秒过期,
     * 也就是10秒后会发送到死信交换机再到死信队列;
     * deadLetterExchange:指定死信交换机的名称;
     * deadLetterRoutingKey:发送死信消息时指定的RoutingKey
     * 上面我们死信队列和死信交换机绑定时已经指定了"text.#"为其绑定RoutingKey;
     * */
    @Bean("testQueue")
    public Queue testQueue(){
        return QueueBuilder.durable(TEST_QUEUE_NAME).ttl(10000).deadLetterExchange(DLX_EXCHANGE_NAME).
                deadLetterRoutingKey("text.info").build();
    
    }
    
    
    //绑定普通交换机与延迟队列;
    //这里的RoutingKey只是我们普通交换机与延迟队列绑定,并不会影响死信消息发送到死信交换机;
    @Bean
    public Binding bindTestExchangeQueue(@Qualifier("testQueue") Queue queue,
                                           @Qualifier("testExchange") Exchange exchange){
    
        return BindingBuilder.bind(queue).to(exchange).with("demo.#").noargs();
    
    }
    
    }
创建一个测试类,发送消息到延迟队列(注意RoutingKey一定要对应)
复制代码
    import com.itlw.config.TTLDLXConfig;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ProducedTest {
    
    //从IOC容器中拿模板类;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void test(){
        //发送消息;
        rabbitTemplate.convertAndSend(TTLDLXConfig.TEST_EXCHANGE_NAME,
                "demo.error","这是一条死信消息....");
    
    }
    }
打开RabbitMQ后端页面进行查看

普通交换机和死信交换机已经实现,并且现在消息仍位于延迟队列中。我们需要等待10秒后查看结果。

在这里插入图片描述
在这里插入图片描述

可以看到消息已经到了死信队列当中,并且消息也是我们发送的消息.

在这里插入图片描述
在这里插入图片描述
创建一个消费者从死信队列中拿出消息进行消费
复制代码
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class MyListener {
    
    /** * 监听一个队列的消息;
     * queues的value一定要和我们生产者里面的死信队列的queue一致;
     * @param message 从队列中获取到的消息
     */
    
    @RabbitListener(queues = "dlx_boot_queue")
    public void myTestListener(String message){
        System.out.println(message);
        System.out.println("拿到消息后进行其他操作......");
    
    }
    }

启动消费者工程,可以看到控制台的输出:

在这里插入图片描述

当在实际生成环境中接收到一条不可行的通知后即可执行其余操作。例如,在未完成的订单情况下需对库存数量进行调整。

全部评论 (0)

还没有任何评论哟~