Advertisement

rabbitmq通过死信队列实现延迟任务——java

阅读量:

很多场景下都需要延迟任务,比如一笔订单50分钟后还没支付则过期,发出去的红包24小时后未领取则自动过期,合同第一次签署失败后20秒后再尝试签署一次。。。。
本博客介绍如何使用rabbitmq通过死信队列实现延迟任务。
首先是配置rabbitmq环境,关于如何配置与mq的各种参数详解,请移步Rabbit mq在linux系统上的安装与配置
当配置好以后,我们需要创建一个配置类,方便我们管理交换机与队列

复制代码
    @Configuration
    public class RabbitmqConfig {
    @Bean
    public Queue contractQueue() {
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    // return new Queue("contractQueue",true,true,false);
    //一般设置一下队列的持久化就好,其余两个就是默认false 
    return new Queue("contractQueue", true);
    }
    @Bean
    //Direct 交 换 机 起 名 :ordersExchange 
    public DirectExchange ordersExchange() {
    return new DirectExchange("ordersExchange", true, false);
    }
    //绑定 将队列和交换机绑定, 并设置用于匹配DirectRouting 
    @Bean
    public Binding bindingDirect(Queue contractQueue, DirectExchange ordersExchange) { 
    return BindingBuilder.bind(contractQueue)
    .to(ordersExchange)//交换机
    .with("DirectRouting");//路由键
    }
    }

配置类起到注册交换机,队列与两者绑定的作用。
那为了方便管理,还可以新建一个类来管理路由键名、交换机名与队列名。
那实现延迟任务,需要准备什么呢?
1.正常交换机
2.正常队列
3.死信交换机
4.死信队列
5.绑定交换机与队列

名称管理RabbitmqConstant

复制代码
    // 延迟时间
    public final static long CONTRACT_SIGN_DEAD_TIME = 1000 * 20;
    // 正常路由键名
    public final static String DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY = "delay.queue.contract.sign.routingkey"; 
    // 死信路由键名
    public final static String DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY = "deadletter.queue.contract.sign.routingkey";
    // 正常交换机
    public final static String CONTRACT_SYNC_SIGN_EXCHANGE = "contract.sync.sign.exchange"; 
    // 死信交换机
    public final static String CONTRACT_SYNC_SIGN_DEAD_EXCHANGE =  "contract.sync.sign.dead.exchange"; 
    // 正常队列
    public final static String CONTRACT_SYNC_SIGN_QUEUE = "contract.sync.sign.queue";
    // 死信队列
    public final static String CONTRACT_SYNC_SIGN_DEAD_QUEUE = "contract.sync.sign.dead.queue";

配置类配置类RabbitmqConfig

复制代码
    // 正常交换机
    @Bean
    public DirectExchange contractSyncSignExchange(){
        return new DirectExchange(RabbitmqConstant.CONTRACT_SYNC_SIGN_EXCHANGE,true,false);
    }
    // 死信交换机
    @Bean
    public DirectExchange contractSyncSignDeadExchange(){
        return new DirectExchange(RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_EXCHANGE,true,false);
    }
    // 正常队列
    @Bean
    public Queue contractSyncSignQueue(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_EXCHANGE); // 绑定死信交换机
        args.put("x-dead-letter-routing-key", RabbitmqConstant.DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY); // 设置死信routing-key
        args.put("x-message-ttl", RabbitmqConstant.CONTRACT_SIGN_DEAD_TIME);// 过期时间
        return new Queue(RabbitmqConstant.CONTRACT_SYNC_SIGN_QUEUE,true,false,false,args);
    }
    // 死信队列
    @Bean
    public Queue contractSyncSignDeadQueue(){
        return new Queue(RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_QUEUE,true);
    }
    // 正常交换机与正常队列绑定
    @Bean
    public Binding bindingContractSyncSignQueue(Queue contractSyncSignQueue,DirectExchange contractSyncSignExchange){
        return BindingBuilder.bind(contractSyncSignQueue)
                .to(contractSyncSignExchange)
                .with(RabbitmqConstant.DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY);
    }
    // 死信交换机与死信队列绑定
    @Bean
    public Binding bindingContractSyncSignDeadQueue(Queue contractSyncSignDeadQueue,DirectExchange contractSyncSignDeadExchange){
        return BindingBuilder.bind(contractSyncSignDeadQueue)
                .to(contractSyncSignDeadExchange)
                .with(RabbitmqConstant.DEAD_LETTER_QUEUE_CONTRACT_SIGN_ROUTING_KEY);
    }

到这里准备工作就完成了,结下来就是准备消费者了,关键就是消费者该监听哪个队列呢?就是死信队列。

复制代码
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConstant.CONTRACT_SYNC_SIGN_DEAD_QUEUE)
    public void comsume(String json)

那生产者如何调用?我们要把消息发送到正常交换机上

复制代码
    JSONObject jsonObject = new JSONObject();
    rabbitTemplate.convertAndSend(RabbitmqConstant.CONTRACT_SYNC_SIGN_EXCHANGE,RabbitmqConstant.DELAY_QUEUE_CONTRACT_SIGN_ROUTING_KEY,jsonObject.toJSONString());

到这里就完美结束了,如果想弄衰弱时间调用的那些呀,可以创多几个交换机与队列;如果想要失败重试的,可以设置一个最大调用上限,作为参数带过去,失败的话再次执行一次生产者的调用代码就ok了。
附上一个流程图加深理解:
在这里插入图片描述

全部评论 (0)

还没有任何评论哟~