Rabbitmq延迟队列实现定时任务
场景
开发过程中时常会遇到定时任务。
Rabbitmq延迟队列
Rabbitmq不具备延迟队列功能,在实际应用中无法直接使用该系统提供的标准队列来实现延迟特性;要要在Rabbitmq中完成延迟队列的功能实现,则必须借助其内置的死信交换机(Exchange)以及设置消息存活时间参数TTL(Time To Live)
死信交换机
当且仅当一条消息满足以下条件时,会被转发到死信交换机网络中。请注意这里指的是专指的交换机而非队列网络。每个交换机会支持多个队列。
该消息未成功接收,并且reject方法的参数设置为:requeue为false。即意味着队列中不会有重复放置的情况;供其他消费者处理。
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信类型的交换机其实就是普通的交换机。只不过是因为我们把过期的消息投入进去之后就被称为死信型的交换机而已。其实并不是说这种类型有特殊之处。
消息TTL(消息存活时间)
消息的时间限制(TTL)即代表其有效期限。通过RabbitMQ系统,我们可以分别对队列和单个消息设定时间限制。当对队列进行时间限制设置时,则是针对那些没有消费者持续连接的队列而言;同时还可以为每个独立的消息单独设定其时间限制。当超出该时间后,则认为该消息已失效并标记为死亡信息(dead message)。若同时在队列和消息上都设置了 TTL,则取较小值;因此,在同一路由策略下一条消息可能因分配到不同的目标队列而导致失效时间不同。这里单讲单个消息的时间限制(TTL),因为它才是实现延迟任务的关键。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
通过配置消息的expiration字段或X-MESSAGE-TTL属性来设定时间间隔,在功能上与前者无异;但需要注意的是expiration字段是一个字符串类型,在实际应用中需将其转换为整数值以使用。
处理流程图

创建交换机(Exchanges)和队列(Queues)
创建死信交换机

如图所示,则是配置一个普通类型的交换机节点,在此场景下为了便于区分类别,则将该交换机节点命名为delay
创建自动过期消息队列
该消息队列的主要功能是将信息定时失效。例如要取消订单需等待2小时后,在指定位置投放该条信息,并将其失效时间为2小时进行设置就可以实现

建立一个名为delay_queue1的自动过期队列,在当前配置下不会自动产生时间戳(x-message-ttl),因为未配置x-message-ttl参数。若所有消息内容相同,则可设置相关参数;鉴于此需求较为灵活,在本场景中选择不进行相关配置。另外两个参数x-dead-letter-exchange用于指定消息过期后进入的交换机,默认情况下配置为delay(即死信交换机)。其中x-dead-letter-routing-key用于指定消息进入死信交换机时使用的路由键值( routing-key),与发送方的消息路由键值具有相同原理,在该key值下完成相应的消息分组。
创建消息处理队列
唯有一个专门用于接收并管理消息流动的队列能够真正实现这一功能

消息队列的名字为delay_queue2
消息队列绑定到交换机
访问交换机详情页面后,在其上进行如下操作:创建并将其两个队列(delay_queue1和delay_queue2)配置到交换机上

自动过期消息队列的routing key 设置为delay
绑定delay_queue2

在delay_queue2中应该将键配置为创建自动过期队列所需的x-dead-letter-routing-key参数。这样当消息自动生成过期时,系统会将它们自动分配到delay_queue2这个队列中
绑定后的管理页面如下图:

当然这个绑定也可以通过代码实现。此外为了直观展示内容本文采用管理平台来进行操作
发送消息
String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代码就是
messageProperties.setExpiration("6000");
设置了让消息6秒后过期
特别提醒:由于需要确保消息自动过期,请避免开启delay_queue1的监听,并阻止该队列中的消息被接收以避免如果不采取上述措施进行管理,则可能导致无法实现预期目标。
接收消息
接收消息配置好delay_queue2的监听就好了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
/** 消息交换机的名字*/
public static final String EXCHANGE = "delay";
/** 队列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 队列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/** * 配置链接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}
/** * 配置消息交换机
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/** * 配置消息队列2
* 针对消费者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //队列持久
}
/** * 将消息队列2与交换机绑定
* 针对消费者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/** * 接受消息的监听,这个监听会接受消息队列1的消息
* 针对消费者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}
对于需要定时执行的任务,在消息监听机制中进行处理即可。由于Rabbitmq具备消息发布能力,可以通过将任务特征码发送出去来实现特定操作。例如,在关闭订单时发送其order_id,则无需逐一查询所有可能需要关闭的订单以减轻MySQL的压力。尤其是当orders数量变得很大时,这种方式能够有效降低对数据库的负载压力。
总结
利用RabbitMQ平台实现定时任务的方法在于将消息配置一个超时时间,并将其放置在一个未被读取的队列里。当这些消息超时后自动转移至另一个队列,在该队列的消息监听点通过特定的操作来执行定时任务的具体流程。
