Advertisement

学会RabbitMQ,看懂这篇文章就够了。

阅读量:

RabbitMQ 研究

1、什么是MQ以及为什么学习RabbitMQ
1.1简介

MessageQueue的缩略形式为MQ,并被称为消息队列。该系统由Erlang语言开发并基于AMQP(AdvancedMessage Queue高级消息队列协议)协议实现了对消息队列功能的支持。它充当了程序间通信的一种机制,在分布式系统开发中被广泛应用于提高系统的可扩展性和处理能力,并提供了一种高效的消息传递解决方案以支持大规模的应用架构设计与实现。 RabbitMQ官方地址: http://www.rabbitmq.com

开发中消息队列通常有如下应用场景:

操作异步处理。被消息队列通知消息接收方执行不需要同步处理且耗时较长的任务。显著提升了应用程序的响应时间

2、应用程序解耦 MQ充当了一个中介角色,在第三方服务提供者与消费者之间传递消息,并实现了应用程序间的解耦

目前市场中有哪些消息队列系统?该协议包括但不限于 ActiveMQ(已广泛应用于企业级应用环境)、RabbitMQ(一个基于P2P架构的开源消息队列)、ZeroMQ(一个高性能的消息队列库)、Kafka(分布式流处理框架)以及MetaMessage Quality(用于消息传输质量控制)、RocketMQ(阿里云提供的高性能消息队列)等技术方案。

选择RabbitMQ的原因是什么?它具有易用性和强大的功能特性,并且遵循AMQP协议标准。其高性能得益于所采用的语言——Erlang。

2、快速入门
2.1RabbitMQ的工作原理
在这里插入图片描述

该系统中的Broker角色扮演着消息队列服务的核心功能。其包含两大核心组件:Exchange负责信息路由分配功能及Queue存储机制。其中.Exchange模块通过预先设定好的策略与算法实现信息传输路径的选择,并将其发送至指定的Queue以供处理;而Message Queue 作为一个先进先出的信息存储机制,在其上完成着各种异步操作。每当一条新信息加入该Message Queue时,在经过预设好的过滤机制后会被分配给对应的消费者处理;而Producer模块则负责生成这些待传输的信息,并将其推送到Message Queue系统中;最后.Consumer模块则作为接收端,在其上完成着各种异步操作。

消息发布与接收流程:-----发送消息-----

  1. 生产者与Broker通过TCP协议建立连接。
  2. 通过与Broker的通信渠道建立连接。
  3. 当生产者借助通道将信息传递至broker节点时, 交换机负责数据的中转。
  4. 交换机将数据传输至预先配置好的队列位置。

----接收消息-----

  1. 通过TCP协议实现双方的连接
  2. 创建通信通道
  3. 设置接收口于指定队列
  4. 当消息到达指定队列时,默认地将信息传递给消费者。
  5. 接收方成功获取信息。
2.2 下载安装

RabbitMQ由Erlang语言开发,而OTP(Open TelecomPlatform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需要安装Erlang/OTP,并保持版本匹配,如下:
1)下载erlang :地址如下:http://erlang.org/download/otp_win64_20.3.exe
安装完成后,将Erlang添加到环境变量中。
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中加%ERLANG_HOME%\bin;

2)安装 RabbitMQ :https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
安装完成后,在计算机上下载并解压 RabbitMQ 安装包。解压完成后,默认路径位于 sudo rabbitmq-server-3.7.3\lib\share\rabbitmq\sbin 目录下。请切换到命令行界面后执行 CD 命令进入该目录。随后运行兔元管理插件的安装脚本: sudo rabbitmq-plugins enable rabbitmq_management 完成插件的配置与注册。最后打开浏览器输入 http://localhost:15672 地址(需根据实际情况调整本地 RabbitMQ 服务端口),在网页中会显示相关操作成功的提示信息。

在这里插入图片描述

初始用户名和密码都是guest,登录即可。

3、工作模式
3.1、Work queues

应用场景

3.2、Publish/Subscribe
在这里插入图片描述

发布订阅模式:
1、每个消费者将个人消息订阅到自身的消息队列。
2、生产者向broker发送消息,并通过交换机将消息传递至所有绑定该交换机的消息队列。
应用场景:当用户的充值操作完成或转账交易完成时系统会向用户发送相应的通知信息。这些通知信息可以通过短信邮件或其他支持的服务形式进行接收

3.3、Routing
在这里插入图片描述

路由模式:

  1. 每个消费者注册并订阅自己的消息队列,并配置一个routinig key用于标识该队列。
  2. 生产者向交换机构送入消息,在收到后交换机依据接收的消息routinig key将其重定向至对应的消息队列中。
3.4、Topics
在这里插入图片描述

通配符模式:

  1. 每个消费者能够监控自己的消息队列,并通过配置带有通配符的.routingkey参数来实现自动路由功能。
  2. 生产者将消息发送至broker节点,并由交换机依据.routingkey参数将消息路由至指定的消息队列。
4、SpringBoot整合RabbitMQ
4.1、搭建SpringBoot环境

我们采用基于Spring-Rabbit的技术用于实现RabbitMQ功能。
https://github.com/spring-projects/spring-amqp
配置后会自动引入spring-rabbit依赖,请参考以下文档获取详细信息:
https://github.com/spring-projects/spring-amqp#关于该依赖包的详细信息

复制代码
     <dependency>
     <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐logging</artifactId>
     </dependency>
4.2、配置

1、配置application.yml 配置连接rabbitmq的参数

复制代码
    server:
      port: 44000 
    spring:
      application:
    name: test‐rabbitmq‐producer   rabbitmq:
    host: 127.0.0.1 
    port: 5672 
    username: guest 
    password: guest 
    virtualHost: /

2、创建名为RabbitConfig的类,并对Exchange组件设置参数以实现消息中继功能;随后初始化Queue队列以处理消息队列;最后配置绑定交换机的属性以完成消息转发功能。其中以Topic交换机为例进行配置

复制代码
    @Configuration 
    public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
     
     
    /** * 交换机配置 
     * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 
     * @return the exchange 
     */
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM() {
        //durable(true)持久化,消息队列重启后交换机仍然存在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();    }
    //声明队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS() {
        Queue queue = new Queue(QUEUE_INFORM_SMS);
        return queue;
    }
    //声明队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL() {
        Queue queue = new Queue(QUEUE_INFORM_EMAIL);
        return queue;
    }
    /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
     * 绑定队列到交换机 .
     * * @param queue    the queue 
     * @param exchange the exchange 
     * @return the binding 
     */
    @Bean 
    public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, 
    @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
    }
    @Bean 
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, 
    @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
    }
    }
4.3、生产端

使用RarbbitTemplate发送消息

复制代码
    @SpringBootTest 
    @RunWith(SpringRunner.class)
    public class Producer05_topics_springboot {
       @Autowired 
    RabbitTemplate rabbitTemplate;
     
    @Test 
    public void testSendByTopics(){
        for (int i=0;i<5;i++){
            String message = "sms email inform to user"+i;
    			rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); 
            System.out.println("Send Message is:'" + message + "'");
        }
    }
    }
4.4、消费端

创建消费端工程,添加依赖:

复制代码
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐logging</artifactId>
     </dependency>

使用@RabbitListener注解监听队列。

复制代码
    @Component 
    public class ReceiveHandler {
     
    //监听email队列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
    public void receive_email(String msg,Message message,Channel channel){        System.out.println(msg);
    }
     
    //监听sms队列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
    public void receive_sms(String msg,Message message,Channel channel){
        System.out.println(msg);
    }
    }

本文全部测试代码均已在GitHub上发布,请参考链接地址: GitHub仓库地址

全部评论 (0)

还没有任何评论哟~