Advertisement

RabbitMQ简介及简单使用

阅读量:

文章目录

  • 对RabbitMQ的基本情况进行概述

  • 简述使用 RabbitMQ 的常见应用场景

  • 阐述如何在 RabbitMQ 平台上建立队列

    • 阐述如何在 RabbitMQ 平台下制定队列策略
    • 描述生产者端的代码实现细节
    • 说明消费者端的代码实现步骤
  • 3.RabbitMQ如何实现消息的可靠传输

  • 4.RabbitMQ的工作队列机制

  • 5.RabbitMQ的交换功能(exchange)

    • 5.1 RabbitMQ Fanout的消息分发机制

    • 5.2 直接式交换机(direct)

    • 5.3 主题模式(topic)

    • 6.MQ如何获取消费者消费结果

    • 7.rabbitmq死信队列

      • 7.1原理
      • 7.2演示
      • 7.3应用场景
    • 8.RabbitMQ重试策略与幂等性问题

      • 8.1重试
      • 8.2幂等性问题

参考蚂蚁课堂

1.RabbitMQ基本介绍

RabbitMQ遵循了高级消息队列协议(AMQP)并提供了开源的消息代理功能(也被称为基于消息的中间件)。 RabbitMQ服务器采用Erlang语言编写而成。 安装完成后就可以直接使用兔兔MQ服务。 访问localhost:15672就能进入兔兔MQ管理台。 输入用户名和密码就能成功登录兔兔MQ系统。

在这里插入图片描述

这个系统中包含了一个称为Virutal Hosts的核心概念,在 RabbitMQ 中对应的虚拟消息交换机 VirtualHost 系统。每一个 Virutal Host 等价于一个功能独立且自成体系的 RabbitMQ 服务实例,并且彼此之间互不干扰地运行着各自的 exchange、queue 和 message 机制。

然后我们看一下RabbitMQ常见的端口号

15672 — RabbitMQ管理平台的端口号

25672 — 集群通信端口号

Amqp 5672 — RabbitMQ内部通信的一个端口号

2.RabbitMQ简单使用案例

RabbitMQ使用的一般步骤包括:第一步需要在管理平台上建立一个队列;随后开发生产者相关的代码;接着开发消费者相关的代码。

2.1在RabbitMQ平台上创建一个队列

随后建立一个虚拟主机,并将权限配置为个人账户guest

在这里插入图片描述
在这里插入图片描述

在该VirtualHost内建立一个名为xxx的Queue实例,在为该队列指定所属的VirtualHost时,请注意设置其名字以及选择是否可以选择将该队列的持久化特性进行设置。完成上述配置后即可完成操作

2.2编写生产者代码

这个发送者的职责就是将消息投送到消息队列中,首先他必须与RabbitMQ进行连接。

复制代码
    public class RabbitMQConnection {
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.配置Host
        connectionFactory.setHost("127.0.0.1");
        //3.设置Port
        connectionFactory.setPort(5672);
        //4.设置账户和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //5.设置VirtualHost
        connectionFactory.setVirtualHost("/wjzVirtualHost");
        return connectionFactory.newConnection();
    }
    }

配置好我们的IP地址端口号和VirtualHost。然后我们编写生产者代码

复制代码
    public class Producer {
    private static final String QUEUE_NAME = "wjz-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建一个新连接
        Connection connection = RabbitMQConnection.getConnection();
        //2.设置channel
        Channel channel = connection.createChannel();
        //3.发送消息
        String msg = "wjz,nb!!!";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("消息投递成功");
        channel.close();
        connection.close();
    }
    }

生产者需要与RabbitMQ服务器建立连接,并随后将消息发送到消息队列中。之后请关闭该连接。基本_publish函数的第一个参数是一个交换机,在本案例中较为简单无需配置即可使用。接下来的第二个参数为目标消息队列名称,请根据实际需求填写相应的名称。第三个参数涉及一些配置信息如过期时间、优先级以及投递模式等。第四个参数则是待发送的消息。

然后我们运行一下看看这条消息有没有被投递到MQ服务器。

在这里插入图片描述
在这里插入图片描述

如图所示,在队列中的变量Ready初始化为1;接下来我们的步骤是将消息从队列中取出。

2.3编写消费者代码

复制代码
    public class Consumer {
    private static final String QUEUE_NAME = "wjz-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                // 消费者完成 消费该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
    }
    }

我们的消费者首先建立了这样一个连接体,在其内部配置了一个通道口,在此通道口上一旦收到消信息便会用于消费这一流程就完成了。最后,在这个监听队列中设置了第二个参数为true时会实现自动接收功能——当消信息被成功获取后即刻从消信队列中移除了该条信息;但若在此过程中发生处理异常,则会导致无法及时完成消信——这是我们无法接受的情况——因此一般会选择手动接收并将该参数设为false状态;在手动模式下(即当消费者收到一条有效且已成功的消信息后),系统会向MQ发送确认通知以确认消信息已正确处理这一流程就完成了

好了解释完了运行一下看看结果。

在这里插入图片描述
在这里插入图片描述

3.RabbitMQ如何保证消息不丢失

MQ服务器端在默认模式下都会完成队列中消息的存储操作。因此即使MQ发生故障内部的消息也不会被丢失。

在这里插入图片描述

如图所示RabbitMQ设计了一个可靠的消息传输机制以防止数据丢失。具体而言当生产者将消息投射至队列中时系统会自动将确认反馈返回给生产者从而确保消息传输的成功性。这种机制通过提供双重验证保证了数据完整性在后续处理过程中不会出现丢失的情况。此外该系统允许通过同步或异步的方式处理确认反馈以适应不同的性能需求。当采用同步机制时如果系统未能成功发送确认信息则会导致生产者等待并阻塞;而当采用异步机制时则会部署一个监控器来检查消息投射是否成功从而避免潜在的数据丢失风险

消费者必须完成消息消费后会向MQ发送一个通知让MQ删除消息

4.RabbitMQ工作队列

默认的传统队列是基于平均分配消费的原则设计的,在这种情况下存在明显的不公平性问题;当各个消费者的处理速度不一致时,默认均摊消费将无法实现公正合理的资源分配。为了实现强者多拿、弱者少拿资源分配策略的目标(即能者多劳),我们需要配置工作队列参数。具体来说,在配置工作队列时,请将消息拉取数量设置为channel.basicQos(int);这个函数来控制消费者一次获取的消息数量,并发送Ack确认消息已被处理以避免重复发送问题。这样MQ系统会自动删除已处理的消息条目。

下面我们可以通过代码来演示一下这个效果

producer

复制代码
    public class Producer {
    private static final String QUEUE_NAME = "wjz-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建一个新连接
        Connection connection = RabbitMQConnection.getConnection();
        //2.设置channel
        Channel channel = connection.createChannel();
        //3.发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "wjz nb:i" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        System.out.println("消息投递成功");
        channel.close();
        connection.close();
    }
    }

首先这个生产者会和MQ服务器建立一个连接,然后向MQ中投放10条消息。

consumer1

复制代码
    public class Consumer1 {
    private static final String QUEUE_NAME = "wjz-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        //指定我们消费者每次批量获取消息
        channel.basicQos(2);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                try {
    // 消费者完成 删除该消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }catch (Exception e){
    
                }
    //
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
    }
    }

消费者也需要与MQ服务器进行连接,并从中获取消息。其中channel.basicQos(2)这一配置参数设置为2意味着每次最多可获取两条消息。随后当这些信息处理完毕后会发送确认指令至MQ服务器;此时系统会自动将这两条信息从队列中移除以实现数据的有效管理

复制代码
    public class Consumer2 {
    private static final String QUEUE_NAME = "wjz-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
    
                }
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                // 消费者完成 删除该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
    }
    }

主要区别在于他处理消息的时间存在1秒的延迟现象,并且由此反映出他的处理效率较低

然后我们看一下结果

在这里插入图片描述
在这里插入图片描述

这表明高能力消费者成功接收到9条信息,而低能力消费者仅成功接收到1条信息,这种分配结果正是工作队列设计的目标.

5.RabbitMQ交换机(exchange)

在这里插入图片描述

假设存在多个消息队列的情况下,在线工作中的生产者将消息发送至这些队列,并由各自对应的信息接收方进行处理。随着系统规模的增长以及复杂性提升,在线工作中的代码实现会变得繁琐冗长且难以维护。因此我们引入了一个的消息路由中间件以便优化流程该中间件会根据预设规则自动分配 incoming messages to appropriate queues从而减少人工干预并提高系统效率其主要功能是根据预设规则将message分配至相应的存储位置以实现高效的资源利用

5.1RabbitMQ Fanout 发布订阅

在这里插入图片描述

fanout发布订阅原理

1.需要创建两个队列,每个队列对应一个消费者;

2.队列需要绑定我们的交换机

3.生产者投递消息到交换机中,通过交换机分发给这两个队列存放起来。

4.消费者从队列中提取这个消息

然后我们通过代码演示一下这个效果。

生产者代码

复制代码
    public class ProducerFanout {
    
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        String msg = "wjz, nb!!";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }
    
    }

我们可以看到他不仅会为交换机命名而且还会将其与该交换机建立连接。,随后将向该交换机内发送信息。

邮件消费者

复制代码
    public class MailConsumer {
    /** * 定义邮件队列
     */
    private static final String QUEUE_NAME = "fanout_email_queue";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }
    }

邮件消费者关联到我们的邮件队列。同理短信消费者也是一样。

复制代码
    public class SmsConsumer {
    /** * 定义短信队列
     */
    private static final String QUEUE_NAME = "fanout_email_sms";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }
    }

然后我们在RabbitMQ控制台创建这些交换机和队列。

在这里插入图片描述

rabbitMQ创建交换机

在这里插入图片描述

请确保我们选择的是fanout类型的交换机类型。然后我们查看启动后的运行结果。

随后开始启动两个消费者。他们等待中接收数据。然后生产者将数据发送至交换机中。最终观察消费者是否能够成功获取到数据

在这里插入图片描述
在这里插入图片描述

结果显示,在将消息发送给fanout类型的交换机后发现,在这些队列中所有队列都成功地被绑定到该交换机上。

5.2 Direct交换机

如果交换机的类型被配置为direct类型,则可以通过指定一个路由键来实现将消息分配至特定队列中。

在这里插入图片描述

RabbitMQ DirectExchange交换机被配置后,在生产者发布一条消息时会携带一个路由键。该系统通过该路由键确定向哪个队列发送这条消息。为了实现这一功能,在RabbitMQ控制台先配置好所需的DirectExchange交换机及其对应的队列。

在这里插入图片描述

然后添加相应的队列。

在这里插入图片描述

然后我们来看一下direct交换机的演示代码

复制代码
    public class ProducerDirect {
    
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        String msg = "wjz,nb!!!";
        channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());
        channel.close();
        connection.close();
    }
    
    }

在当前系统中,在我们设置了该生产者端点的路由键为 email 地址后,其消息将被投递至邮件处理通道。

邮件消费者

复制代码
    public class MailConsumer {
    /** * 定义邮件队列
     */
    private static final String QUEUE_NAME = "direct_email_queue";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }

该邮件消费者采用 queueBind 方法将其绑定到队列与交换机,并将路由键设置为 email

短信消费者

复制代码
    public class SmsConsumer {
    /** * 定义短信队列
     */
    private static final String QUEUE_NAME = "direct_sms_queue";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }
    }

从代码中可以看出短信消费者的相关配置情况。具体来说,在该配置中通过调用queueBind方法实现了队列与交换机之间的绑定关系。同时设置路由键为sms以便后续进行相关操作。接下来的操作步骤包括运行该程序以验证其功能是否达到预期效果。

在这里插入图片描述
在这里插入图片描述

基于上述分析可以看出,在实际运行过程中我们观察到以下现象:首先,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中我们观察到,在实际运行过程中

5.3 Topic主题模式

若交换机类型配置为Topic型时,则基于队列绑定的路由键实施模糊转发机制至对应入站队列。

#表示支持匹配多个词,*表示只能匹配一个词。

在这里插入图片描述

如图所示,在路由中生产者会推送消息key为a.sms表明该主题即为a其余的队列需要注册到该主题其余的队列在路由key上需配置绑定该主题并在主题名称后添加模糊匹配关键字如图所示在此情况下生产者所推送的消息将会被放置于短信队列中最后这些短信会被短信消费者处理

(在RabbitMQ添加队列和交换机的步骤略)

生产者

复制代码
    public class ProducerTopic {
    
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        String msg = "wjz,nb";
        channel.basicPublish(EXCHANGE_NAME, "wjz.sms", null, msg.getBytes());
        channel.close();
        connection.close();
    }
    
    }

邮件消费者

复制代码
    public class MailConsumer {
    /** * 定义邮件队列
     */
    private static final String QUEUE_NAME = "topic_email_queue";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "xxx.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }
    }

短信消费者

复制代码
    public class SmsConsumer {
    /** * 定义短信队列
     */
    private static final String QUEUE_NAME = "topic_sms_queue";
    /** * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "wjz.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    }
    }

然后我们看一下结果

在这里插入图片描述
在这里插入图片描述

消费者订阅了wjz这个主题所以他会收到生产者投递来的消息。

6.MQ如何获取消费者消费结果

在这里插入图片描述

如图所示是一个订单服务流程图首先生产者会将消息投递至交换机这条消息如同下单指令随后交换机会将该消息发送至订单队列接下来由订单消费者从队列中获取该条信息并将其插入至数据库系统之中然而我的生产者无法确认该条信息是否已成功完成插入操作

这个解决方案如下图所示

在这里插入图片描述

生产者将消息投递给交换机。
随后MQ服务器端返回了一个全局事务ID给生产者。
接着在rabbitmq系统上该流程未实现,在rocketmq系统上得到了支持。

7.rabbitmq死信队列

备用通道通常被称为‘死信队列’(dead message queue),当某个消息中间件因故未能接收特定的消息时,则能够将该消息转移至备用通道进行存储。备用通道通常还具备交换机制以及路由键等辅助功能以确保信息的安全传输。

7.1原理

在这里插入图片描述

当以下几种情况出现时订单队列中的msg会向死信队列中转移。

  • 生产者将消息发送至订单交换机后,在随后的消息被分配至订单队列中,在此期间消费者未能及时获取相关信息。
  • 当团队面临资源限制时,在这种情况下一旦出现超负荷运转的情况,则会触发自动响应机制。
  • 由于代码存在潜在缺陷导致系统异常运行的可能性提高,在这种情况下即使如此,在尝试多次后仍无法解决问题的情况下最终将这些未处理的信息存入备用的死信区域。

7.2演示

在我们的演示中,在线演示效果表明:该系统中有一个生产者,在启动时会将消息投递到订单交换机上。随后,订单交换机会将这些消息转发至对应的订单队列中。然而我们特意关闭了所有订单消费者的配置项——这样就不会有任何其他系统去订阅这些未被订阅的消息了。随后——即当所有订阅中断后—— orders queue 就会被直接转接到 dead_letter_queue 中作为备用存储空间;而 dead_letter 消费器则会在适当的时候处理这些未被订阅的消息。

config

复制代码
    @Component
    public class DeadLetterMQConfig {
    /** * 订单交换机
     */
    @Value("${wjz.order.exchange}")
    private String orderExchange;
    
    /** * 订单队列
     */
    @Value("${wjz.order.queue}")
    private String orderQueue;
    
    /** * 订单路由key
     */
    @Value("${wjz.order.routingKey}")
    private String orderRoutingKey;
    /** * 死信交换机
     */
    @Value("${wjz.dlx.exchange}")
    private String dlxExchange;
    
    /** * 死信队列
     */
    @Value("${wjz.dlx.queue}")
    private String dlxQueue;
    /** * 死信路由
     */
    @Value("${wjz.dlx.routingKey}")
    private String dlxRoutingKey;
    
    /** * 声明死信交换机
     * * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    
    /** * 声明死信队列
     * * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    
    /** * 声明订单业务交换机
     * * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    
    /** * 声明订单队列
     * * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    
    /** * 绑定死信队列到死信交换机
     * * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    
    
    /** * 绑定订单队列到订单交换机
     * * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
    }

producer

复制代码
    @RestController
    public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /** * 订单交换机
     */
    @Value("${wjz.order.exchange}")
    private String orderExchange;
    /** * 订单路由key
     */
    @Value("${wjz.order.routingKey}")
    private String orderRoutingKey;
    
    @RequestMapping("/sendOrder")
    public String sendOrder() {
        String msg = "wjz,nb";
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, msg, message -> {
            // 设置消息过期时间 10秒过期
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
        return "success";
    }
    }

dead-consumer

复制代码
    @Slf4j
    @Component
    public class OrderDlxConsumer {
    
    /** * 死信队列监听队列回调的方法
     * * @param msg
     */
    @RabbitListener(queues = "wjz_order_queue")
    public void orderConsumer(String msg) {
        log.info(">死信队列消费订单消息:msg{}<<", msg);
    }
    }

order-onsumer

复制代码
    @Component
    @Slf4j
    public class OrderConsumer {
    
    /** * 监听队列回调的方法
     * * @param msg
     */
    @RabbitListener(queues = "wjz_order_queue")
    public void orderConsumer(String msg) {
        log.info(">>正常订单消费者消息MSG:{}<<", msg);
    }
    }
在这里插入图片描述

如图所示我不注释订单消费者订单消费者消费。

在这里插入图片描述

注释掉订单消费者后消息会转移到死信队列当中让死信队列去消费。

7.3应用场景

订单超时回滚的设计:我们可以采用Redis来实现这一功能。具体而言,在Redis中以订单号作为键值进行配置,在设置好超时时间后一旦超出该时间就会触发通知客户端进行数据库回滚操作。此外我们还可以借助普通队列和死信备胎队列的机制来实现这一功能即建立一个未绑定消费者的消息源当消息到达指定过期时间后会将其重排至死信备胎队列随后在死信消费者中查询对应订单号是否已成功支付若查询结果未显示成功则执行数据库回滚操作。

8.RabbitMQ重试策略与幂等性问题

在这样的情况下, 我们需要通过异步方式下单, 消费者在进行消费时, 但其代码出现异常, 表明消息处理出现问题, 因此消费者会不断重试, 最终导致数据库中有大量重复的订单记录.

8.1重试

controller

复制代码
    @RequestMapping("/sendOrder")
    public String sendOrder() {
        // 生成全局id
        String orderId = System.currentTimeMillis() + "";
        log.info("orderId:{}", orderId);
        String orderName = "每特教育svip课程报名";
        orderProducer.sendMsg(orderName, orderId);
        return orderId;
    }

producer

复制代码
    public void sendMsg(String orderName, String orderId) {
        OrderEntity orderEntity = new OrderEntity(orderName, orderId);
        rabbitTemplate.convertAndSend("/wjz_order", "", orderEntity, message -> {
            return message;
        });
    }

consumer

复制代码
    @Slf4j
    @Component
    @RabbitListener(queues = "fanout_order_queue")
    public class FanoutOrderConsumer {
    
    @Autowired
    private OrderManager orderManager;
    @Autowired
    private OrderMapper orderMapper;
    
    @RabbitHandler
    public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        log.info(">>orderEntity:{}<<", orderEntity.toString());
        String orderId = orderEntity.getOrderId();
        if (StringUtils.isEmpty(orderId)) {
            return;
        }
        int result = orderManager.addOrder(orderEntity);
        int i = 1 / 0;
        log.info(">>插入数据库中数据成功<<");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    }

为了限制时间跨度内不进行过多的重试操作,在YAML文件中明确设置相关的规则以避免长时间反复请求

复制代码
    spring:
      rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: guest
    ####密码
    password: guest
    ### 地址
    virtual-host: /wjzVirtualHost
    listener:
      simple:
        retry:
          ####开启消费者(程序出现异常的情况下会)进行重试
          enabled: true
          ####最大重试次数
          max-attempts: 5
          ####重试间隔时间
          initial-interval: 3000
      datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver

随后, 我们运行该服务流程, 并通过调用sendOrder接口监控数据库的状态.

在这里插入图片描述

我们能够观察到订单出现重复的情况。这通常是由于我们的消费者在执行业务代码时触发异常处理流程,在此情况下,默认情况下,默认重试次数设置为无限。因此需要我们手动指定重试次数来实现有限次的重试机制。

通常情况下,在信息接收过程中

8.2幂等性问题

当消费者在执行业务代码时,
如果发生异常,在此期间消息队列mq会自动触发重试机制。
然而,在重试过程中,
这可能导致消费者重复消费的问题。
这就是所谓的消息幂等性问题。
针对这一问题,
我们提出了解决方案。

生产者在发送消息时会自动生成一个全局唯一标识符,并将其嵌入到消息中。消费者接收该消息后可以通过该唯一标识符实现去重功能。对于insert操作类型,则需要先检查是否存在已有记录;若不存在则进行插入操作;若有则不进行插入。而对于update操作类型,则采用乐观锁机制以提高事务处理效率。

我们看一下全局唯一Id的效果。

复制代码
    @Slf4j
    @Component
    @RabbitListener(queues = "fanout_order_queue")
    public class FanoutOrderConsumer {
    
    @Autowired
    private OrderManager orderManager;
    @Autowired
    private OrderMapper orderMapper;
    
    @RabbitHandler
    public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        log.info(">>orderEntity:{}<<", orderEntity.toString());
        String orderId = orderEntity.getOrderId();
        if (StringUtils.isEmpty(orderId)) {
            return;
        }
        OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
        if (dbOrderEntity != null) {
            log.info("另外消费者已经处理过该业务逻辑");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }
        int result = orderManager.addOrder(orderEntity);
        int i = 1 / 0;
        log.info(">>插入数据库中数据成功<<");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

我们可以使用订单Id在数据库中进行检索核查是否有对应的记录;如果未找到则执行插入操作并完成任务。然后运行程序以观察结果如何

在这里插入图片描述

这样就算报错了重试也不会有重复的数据了。

全部评论 (0)

还没有任何评论哟~