SpringBoot+RabbitMQ实现延迟队列
发布时间
阅读量:
阅读量
本文目录
-
-
- 第一部分:延迟队列的功能及其实现机制
-
- 第二部分:基于SpringBoot框架与RabbitMQ技术实现的消息延迟传输机制
- 在项目pom文件中,请依次添加以下依赖项:
- 在生产者工程模块中,设计并实现交换机与 RabbitMQ 队列之间的绑定关系配置文件
- 开发测试用例时,向系统中的延迟队列发送测试消息(确保 RoutingKey与系统配置匹配)
- 登录 RabbitMQ 管理界面,查看当前系统的运行状态及消息传输情况
- 设计消费者逻辑时,从 RabbitMQ 的死信(持久化)队列中读取消息并进行处理
一、什么是延迟队列?
首先来了解什么是延迟队列?它指的是当消息进入队列后不会立即被处理,只有等到指定的时间点才会开始处理这些消息。

常见需求: 1、在网上商城下单后,30分钟未支付,取消订单,回滚库存;
2、新用户注册成功7天后,发送短信.
实现方法: 1、定时器 2、延迟队列
然而RabbitMQ并未内置延迟队列功能;不过可以通过将TTL(时延)机制与DLX(死信)队列特性相结合来达到类似效果。

二、SpringBoot+RabbitMQ实现延迟队列
在pom中导入以下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在生产者工程中,创建一个配置类,声明交换机与队列的绑定关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLDLXConfig {
//死信交换机的名称;
public static final String DLX_EXCHANGE_NAME = "dlx_boot_exchange";
//死信队列名称;
public static final String DLX_QUEUE_NAME = "dlx_boot_queue";
//普通交换机的名称;
public static final String TEST_EXCHANGE_NAME = "test_boot_exchange";
//普通队列的名称;
public static final String TEST_QUEUE_NAME = "test_boot_queue";
/** * 声明死信交换机,我们会定义多个交换机,
* 所以给这个注入的Bean起一个名字,同理在绑定的时候用@Qualifier注解;
* durablie:持久化
*/
@Bean("dlxExchange")
public Exchange dlxExchange(){
return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
}
//声明死信队列;
@Bean("dlxQueue")
public Queue dlxQueue(){
return QueueBuilder.durable(DLX_QUEUE_NAME).build();
}
//绑定死信交换机和死信队列;
@Bean
public Binding bindDLXExchangeQueue(@Qualifier("dlxQueue") Queue queue,
@Qualifier("dlxExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("text.#").noargs();
}
//声明普通交换机
@Bean("testExchange")
public Exchange testExchange(){
return ExchangeBuilder.topicExchange(TEST_EXCHANGE_NAME).durable(true).build();
}
/** * 声明延迟队列;
* ttl:设置消息过期的时间,单位为毫秒,方便测试效果,这里设置10秒过期,
* 也就是10秒后会发送到死信交换机再到死信队列;
* deadLetterExchange:指定死信交换机的名称;
* deadLetterRoutingKey:发送死信消息时指定的RoutingKey
* 上面我们死信队列和死信交换机绑定时已经指定了"text.#"为其绑定RoutingKey;
* */
@Bean("testQueue")
public Queue testQueue(){
return QueueBuilder.durable(TEST_QUEUE_NAME).ttl(10000).deadLetterExchange(DLX_EXCHANGE_NAME).
deadLetterRoutingKey("text.info").build();
}
//绑定普通交换机与延迟队列;
//这里的RoutingKey只是我们普通交换机与延迟队列绑定,并不会影响死信消息发送到死信交换机;
@Bean
public Binding bindTestExchangeQueue(@Qualifier("testQueue") Queue queue,
@Qualifier("testExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("demo.#").noargs();
}
}
创建一个测试类,发送消息到延迟队列(注意RoutingKey一定要对应)
import com.itlw.config.TTLDLXConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducedTest {
//从IOC容器中拿模板类;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//发送消息;
rabbitTemplate.convertAndSend(TTLDLXConfig.TEST_EXCHANGE_NAME,
"demo.error","这是一条死信消息....");
}
}
打开RabbitMQ后端页面进行查看
普通交换机和死信交换机已经实现,并且现在消息仍位于延迟队列中。我们需要等待10秒后查看结果。


可以看到消息已经到了死信队列当中,并且消息也是我们发送的消息.


创建一个消费者从死信队列中拿出消息进行消费
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyListener {
/** * 监听一个队列的消息;
* queues的value一定要和我们生产者里面的死信队列的queue一致;
* @param message 从队列中获取到的消息
*/
@RabbitListener(queues = "dlx_boot_queue")
public void myTestListener(String message){
System.out.println(message);
System.out.println("拿到消息后进行其他操作......");
}
}
启动消费者工程,可以看到控制台的输出:

当在实际生成环境中接收到一条不可行的通知后即可执行其余操作。例如,在未完成的订单情况下需对库存数量进行调整。
全部评论 (0)
还没有任何评论哟~
