Advertisement

SpringBoot+RabbitMq延迟插件实现延时队列

阅读量:

SpringBoot+RabbitMq延迟插件实现延时队列

  • 一.安装RabbitMq
  • 二.话不多说直接上代码
    • 1.导入maven依赖
    • 2.在application.yml配置文件中引入RabbitMq配置信息
    • 3.rabbitMq配置类
    • 4.消息发送确认
    • 5.创建控制器并向队列发送消息
    • 6.发送队列消息的service
    • 7.发送队列消息的service实现类
    • 8.配置队列的监听者(完成延时操作)

一.安装RabbitMq

复制代码
    链接地址:

二.话不多说直接上代码

1.导入maven依赖

复制代码
     <!-- rabbitMq -->
     <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>

2.在application.yml配置文件中引入RabbitMq配置信息

复制代码
    spring: 
      rabbitmq:
    host: 127.0.0.1(你的RabbitMq配置地址)
    port: 5672(端口)
    username: super(用户名)
    password: Jmy2019.(密码)
    virtual-host: ecosphere(主机)

3.rabbitMq配置类

复制代码
    package com.nuvole.merchant.conf.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /** * rabbit配置类(声明交换机、队列以及他们的绑定关系)
     * * @author lc
     * @date 2020/7/1 11:15
     */
    @Configuration
    public class AmqpConfig {
    
    // 交换机名称
    public static final String TEST_EXCHANGE_KEY = "exchange.pay";
    
    // 队列名称(测试)
    public static final String TEST_QUEUE_KEY = "test.pay";
    
    // 队列路线/绑定关系(测试)
    public static final String TEST_ROUTK = "test.pay";
    
    
    @Bean
    public Queue testQueue() {
    
        return new Queue(TEST_QUEUE_KEY, true);
    }
    
    
    /** * 延时队列交换器
     * * @author lc
     * @date 2020/6/30 15:06
     */
    @Bean
    public CustomExchange testExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(TEST_EXCHANGE_KEY, "x-delayed-message", true, false, args);
    }
    
    
    @Bean
    public Binding testBinding(CustomExchange testExchange, Queue testQueue) {
        Binding binding = BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTK).noargs();
        return binding;
    }
    }

4.消息发送确认

复制代码
    package com.nuvole.merchant.conf.mq;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    /** * 消息发送确认
     * * @author lc
     * @date 2020/7/1 11:14
     */
    @Component
    public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
                + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
        } else {
            System.out.println("消息发送确认失败:" + cause);
        }
    }
    }

5.创建控制器并向队列发送消息

复制代码
    package com.nuvole.merchant.controller.v1;
    
    import com.alibaba.fastjson.JSON;
    import com.nuvole.merchant.conf.mq.AmqpConfig;
    import com.nuvole.merchant.service.mq.QueueMessageService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    import java.util.Date;
    import java.util.HashMap;
    
    @Slf4j
    @RestController
    @RequestMapping("/test")
    public class TestController {
    
    @Resource
    private QueueMessageService queueMessageService;
    
    @GetMapping("/do")
    public void dos(String msg, int msec) {
        log.info("开始发送延时队列。。。。。。。。。。。。。");
    
        log.info(new Date().toString() + "----------当前时间");
        HashMap<String, Object> param = new HashMap<>();
        param.put("params", 1);
        param.put("order", 2);
    
        //queueMessageService.send(AmqpConfig.IMPORT_EXCHANGE_KEY, AmqpConfig.TEST_1_QUEUE_KEY, JSON.toJSONString(param));
    
        queueMessageService.delayedSend(AmqpConfig.TEST_EXCHANGE_KEY, AmqpConfig.TEST_QUEUE_KEY, msg, msec);
    }
    }

6.发送队列消息的service

复制代码
    package com.nuvole.merchant.service.mq;
    
    /** * 发送消息
     * * @author lc
     * @date 2020/7/1 11:26
     */
    public interface QueueMessageService {
    
    /** * 发送正常队列消息
     * * @author lc
     * @date 2020/7/1 11:26
     */
    void send(String exchangeKey, String routingKey, Object message);
    
    
    /** * 发送延时队列消息
     * * @param
     * @author lc
     * @date 2020/6/30 11:47
     */
    void delayedSend(String exchangeKey, String routingKey, Object message, int msec);
    
    }

7.发送队列消息的service实现类

复制代码
    package com.nuvole.merchant.service.mq;
    
    import com.nuvole.util.IdGenerator;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class QueueMessageServiceImpl implements QueueMessageService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public void send(String exchangeKey, String routingKey, Object message) {
        CorrelationData correlationData = new CorrelationData(IdGenerator.getUUID());
        rabbitTemplate.convertAndSend(exchangeKey, routingKey, message, correlationData);
    }
    
    
    @Override
    public void delayedSend(String exchangeKey, String routingKey, Object msg,final int xdelay) {
    
        rabbitTemplate.convertAndSend(exchangeKey, routingKey, msg, message -> {
            // 设置延迟时间
            message.getMessageProperties().setDelay(xdelay);
            return message;
        });
    }
    
    }

8.配置队列的监听者(完成延时操作)

复制代码
    package com.nuvole.merchant.conf.mq;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.Date;
    
    /** * 监听队列消息
     * * @Author: lc
     * @Date: 2020/5/20 16:09
     */
    @Slf4j
    @Component
    @EnableRabbit
    public class TestReceiver {
    
    
    @RabbitListener(queues = AmqpConfig.TEST_QUEUE_KEY)
    public void process(String msg, Channel channel, Message message) {
        log.info("======================延时队列开始执行。。。。。。。。。。。。。。");
        log.info(new Date().toString() + ",延时收到了信息 message = " + msg);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    }
  • 至此一切准备完毕,启动项目来看一下效果。 依次访问接口:
复制代码
      http://localhost:9097/test/do?msg=1&msec=10000
      http://localhost:9097/test/do?msg=2&msec=5000
复制代码
    2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO  com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。
    2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO  com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:11:58 CST 2020----------当前时间
    return--message:1,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay
    2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO  com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。
    2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO  com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:12:00 CST 2020----------当前时间
    return--message:2,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay
    2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO  com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。
    2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO  com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:05 CST 2020,延时收到了信息 message = 2
    2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO  com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。
    2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO  com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:08 CST 2020,延时收到了信息 message = 1
  • 可以看到第一个消息10秒后才被消费,第二个消息5秒消费,符合预期结果。

**下一篇:Linux下安装RabbitMq以及延时插件**

全部评论 (0)

还没有任何评论哟~