RabbitMQ实现延迟任务
目录
1.如何实现延迟队列?
2.实现延迟队列
2.1 安装并启动延迟队列
2.1.1 下载延迟插件
2.1.2 将插件放到插件目录
2.1.3 启动插件
2.1.4 重启RabbitMQ服务
2.1.5 验收结果
2.1.6 手动创建延迟交换器(可选)
2.2 编写延迟消息实现代码
2.2.1 配置交换器和队列
2.1.2 定义消息发送方法
2.1.3 发送延迟消息
2.1.4 接收延迟消息
小结
延迟操作即指在消息发送之后,并非会立即被执行,在经过特定时间间隔后才由消费者进行处理。延迟队列的应用场景包括但不限于以下几种:
未按时支付的订单,30 分钟过期之后取消订单。
给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。
新注册会员的用户,等待几分钟之后发送欢迎邮件等。
1.如何实现延迟队列?
延迟队列有以下两种实现方式:
当消息过期时会被引导至死信交换器,并随后从该交换器转发至延迟消费队列以实现延迟效果。
使用官方提供的延迟插件实现延迟功能。
在早期阶段广泛采用第一种方法。但由于存在“队头阻塞”的问题,在生产环境中,默认情况下我们推荐使用RabbitMQ 3.5.7版本(发布于2015年底)及其延迟插件配置。其配置方案更为便捷且高效。本文主要介绍第二种实习模式。
在早期阶段广泛采用第一种方法。但由于存在"队头阻塞"的问题,在生产环境中,默认情况下我们推荐使用RabbitMQ 3.5.7版本(发布于2015年底)及其延迟插件配置。其配置方案更为便捷且高效。本文主要介绍第二种实习模式。
2.实现延迟队列
2.1 安装并启动延迟队列
2.1.1 下载延迟插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
请注意,请根据您的RabbitMQ服务器端版本配置相同版本的延迟插件,并通过RabbitMQ控制台进行设置


2.1.2 将插件放到插件目录
注
docker cp 宿主机文件 容器名称或ID:容器目录
如下图所示:

之后,进入 docker 容器,查看插件中是否包含延迟队列:
docker exec -it 容器名称或ID /bin/bash rabbitmq-plugins list
如下图所示:

2.1.3 启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如下图所示:

2.1.4 重启RabbitMQ服务
安装完成后无需手动操作即可使RabbitMQ服务正常运转。在 Docker环境中只需确保容器重新启动即可完成配置更新:
docker restart 容器名称或ID
如下图所示:

2.1.5 验收结果
在 RabbitMQ 控制台查看界面中,默认情况下是否带有延迟消息功能选项?如果存在此选项,则表示延迟消息插件已正常配置完成,请参考下图进行确认:

2.1.6 手动创建延迟交换器(可选)
此步骤为可选操作(非必要)。由于某些版本的程序在创建延迟交换器时可能出现问题,在出现问题时,则可以直接手动构建延迟队列来解决。如图所示。

2.2 编写延迟消息实现代码
2.2.1 配置交换器和队列
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
/** * 延迟交换器和队列
*/
@Configuration
public class DelayedExchangeConfig {
public static final String EXCHANGE_NAME = "myDelayedExchange";
public static final String QUEUE_NAME = "delayed.queue";
public static final String ROUTING_KEY = "delayed.routing.key";
@Bean
public CustomExchange delayedExchange() {
return new CustomExchange(EXCHANGE_NAME,
"x-delayed-message", // 消息类型
true, // 是否持久化
false); // 是否自动删除
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}

2.1.2 定义消息发送方法
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 5000)
public void sendDelayedMessage(String message) {
rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME,
DelayedExchangeConfig.ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒
return messagePostProcessor;
});
}
}

2.1.3 发送延迟消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/delayed")
public class DelayedMessageController {
@Autowired
private DelayedMessageProducer delayedMessageProducer;
@GetMapping("/send")
public String sendDirectMessage(@RequestParam String message) {
delayedMessageProducer.sendDelayedMessage(message);
return "Delayed message sent to Exchange: " + message;
}
}

2.1.4 接收延迟消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DelayedMessageConsumer {
@RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME)
public void receiveDelayedMessage(String message) {
System.out.println("Received delayed message: " + message);
}
}

小结
主要采用官方提供的延迟插件来完成RabbitMQ延迟队列的实现。具体来说,在使用延迟插件时需要完成以下步骤:首先是下载所需的插件包;接着是对服务器进行配置;最后是重新启动RabbitMQ服务。这样就使得开发者能够通过编写相应的代码来构建高效的延迟队列系统。
