rabbitmq通过死信队列实现延迟任务——java
发布时间
阅读量:
阅读量
很多场景下都需要延迟任务,比如一笔订单50分钟后还没支付则过期,发出去的红包24小时后未领取则自动过期,合同第一次签署失败后20秒后再尝试签署一次。。。。
本博客介绍如何使用rabbitmq通过死信队列实现延迟任务。
首先是配置rabbitmq环境,关于如何配置与mq的各种参数详解,请移步Rabbit mq在linux系统上的安装与配置
当配置好以后,我们需要创建一个配置类,方便我们管理交换机与队列
@Configuration
public class RabbitmqConfig {
@Bean
public Queue contractQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("contractQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("contractQueue", true);
}
@Bean
//Direct 交 换 机 起 名 :ordersExchange
public DirectExchange ordersExchange() {
return new DirectExchange("ordersExchange", true, false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配DirectRouting
@Bean
public Binding bindingDirect(Queue contractQueue, DirectExchange ordersExchange) {
return BindingBuilder.bind(contractQueue)
.to(ordersExchange)//交换机
.with("DirectRouting");//路由键
}
}
配置类起到注册交换机,队列与两者绑定的作用。
那为了方便管理,还可以新建一个类来管理路由键名、交换机名与队列名。
那实现延迟任务,需要准备什么呢?
1.正常交换机
2.正常队列
3.死信交换机
4.死信队列
5.绑定交换机与队列
名称管理RabbitmqConstant
// 延迟时间
public final static long CONTRACT_SIGN_DEAD_TIME = 1000 * 20;
// 正常路由键名
public final static String DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY = "delay.queue.contract.sign.routingkey";
// 死信路由键名
public final static String DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY = "deadletter.queue.contract.sign.routingkey";
// 正常交换机
public final static String CONTRACT_SYNC_SIGN_EXCHANGE = "contract.sync.sign.exchange";
// 死信交换机
public final static String CONTRACT_SYNC_SIGN_DEAD_EXCHANGE = "contract.sync.sign.dead.exchange";
// 正常队列
public final static String CONTRACT_SYNC_SIGN_QUEUE = "contract.sync.sign.queue";
// 死信队列
public final static String CONTRACT_SYNC_SIGN_DEAD_QUEUE = "contract.sync.sign.dead.queue";
配置类配置类RabbitmqConfig
// 正常交换机
@Bean
public DirectExchange contractSyncSignExchange(){
return new DirectExchange(RabbitmqConstant.CONTRACT_SYNC_SIGN_EXCHANGE,true,false);
}
// 死信交换机
@Bean
public DirectExchange contractSyncSignDeadExchange(){
return new DirectExchange(RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_EXCHANGE,true,false);
}
// 正常队列
@Bean
public Queue contractSyncSignQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_EXCHANGE); // 绑定死信交换机
args.put("x-dead-letter-routing-key", RabbitmqConstant.DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY); // 设置死信routing-key
args.put("x-message-ttl", RabbitmqConstant.CONTRACT_SIGN_DEAD_TIME);// 过期时间
return new Queue(RabbitmqConstant.CONTRACT_SYNC_SIGN_QUEUE,true,false,false,args);
}
// 死信队列
@Bean
public Queue contractSyncSignDeadQueue(){
return new Queue(RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_QUEUE,true);
}
// 正常交换机与正常队列绑定
@Bean
public Binding bindingContractSyncSignQueue(Queue contractSyncSignQueue,DirectExchange contractSyncSignExchange){
return BindingBuilder.bind(contractSyncSignQueue)
.to(contractSyncSignExchange)
.with(RabbitmqConstant.DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY);
}
// 死信交换机与死信队列绑定
@Bean
public Binding bindingContractSyncSignDeadQueue(Queue contractSyncSignDeadQueue,DirectExchange contractSyncSignDeadExchange){
return BindingBuilder.bind(contractSyncSignDeadQueue)
.to(contractSyncSignDeadExchange)
.with(RabbitmqConstant.DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY);
}
到这里准备工作就完成了,结下来就是准备消费者了,关键就是消费者该监听哪个队列呢?就是死信队列。
@RabbitHandler
@RabbitListener(queues = RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_QUEUE)
public void comsume(String json)
那生产者如何调用?我们要把消息发送到正常交换机上
JSONObject jsonObject = new JSONObject();
rabbitTemplate.convertAndSend(RabbitmqConstant.CONTRACT_SYNC_SIGN_EXCHANGE,RabbitmqConstant.DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY,jsonObject.toJSONString());
到这里就完美结束了,如果想弄衰弱时间调用的那些呀,可以创多几个交换机与队列;如果想要失败重试的,可以设置一个最大调用上限,作为参数带过去,失败的话再次执行一次生产者的调用代码就ok了。
附上一个流程图加深理解:

全部评论 (0)
还没有任何评论哟~
