Advertisement

rabbitmq-01 rabbitmq概念介绍

阅读量:

1.RabbitMQ简介

RabbitMQ作为erlang开发的一种AMQP(Advanced Message Queue)开源实现出现于世。虽然在同步消息通讯领域内却有诸多公开的标准(如 COBAR 的 IIOP 或者是 SOAP 等),但在异步消息处理领域内却并非如此,在异步消息处理领域内只有少数大企业拥有商业实现(如微软的 MSMQ 或 IBM 的 Websphere MQ 等),因此,在 2006 年 6 月时 Cisco、Redhat、iMatix 等共同制定了 AMQP 的公开标准。

RabbitMQ是由RabbitMQ Technologies Ltd开发并为其提供商业支持的软件产品。该软件公司于2010年4月作为SpringSource旗下VMWare部门的一部分完成收购。随后于2013年5月正式成为Pivotal集团的一部分。值得注意的是,在合并过程中VMWare Pivotal与EMC本质上属于同一实体但两者在组织架构和发展方向上有所区别其中VMWare维持独立上市子公司地位而Pivotal则整合了EMC的部分资源目前并未实现上市。

特别注意:RabbitMQ基于Erlang语言开发,并因此需要一个Erlang环境才能运行

1.1 erlang语言简介

Erlang编程语言最初设计为解决大型电信交换设备的软件开发需求,在多核处理器环境下展现出极高的可靠性特点。随着多核处理器技术日益普及以及互联网和云计算等领域的快速发展,在这种高性能计算环境中展现出显著的应用潜力与扩展前景。

初衷理念实现抗高并发语言

2.几种常见消息中间件

在这里插入图片描述

3.RabbitMQ环境安装

3.1 windows下安装

  • 安装Erlang语言及其所需组件,下载地址:http://www.erlang.org/download
    • 设置系统环境变量

      • 添加系统环境变量ERLANG_HOME,并将其值设置为对应的Erlang安装路径
      • 设置系统环境变量PATH,并在其中添加%ERLANG_HOME%\bin
    • 下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html

注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。

3.2 docker安装

  • 拉取镜像
复制代码
    docker pull rabbitmq:3-management
    
    
      
    
    AI写代码
  • 启动镜像(默认用户名密码),默认guest 用户,密码也是 guest
复制代码
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
    
    
      
    
    AI写代码
  • 启动镜像(设置用户名密码)
复制代码
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3-management
    
    
      
    
    AI写代码

4.RabbitMQ管理平台

  • RabbitMQ 管理平台地址:http://127.0.0.1:15672
    • 默认设置:公共访问令牌:guest@public
      用户能够自行创建新的账户。

4.1 Virtual Hosts

类似于MySQL中的数据库体系结构,在RabbitMQ中也存在类似的权限管理机制。同样地,在RabbitMQ中可以虚拟消息队列(VirtualHost),每个 VirtualHost 相当于一个相对独立的 RabbitMQ 服务器,并且各个 VirtualHost 之间相互隔离。此外,在 RabbitMQ 中 exchange、queue 和 message 无法互通

在这里插入图片描述
在这里插入图片描述

5.RabbitMQ关键名词

该协议(Advanced Message Queue Protocol)作为异步通信机制的应用层规范而存在;其核心在于支持面向的消息代理机实现;基于该协议的客户端及代理机可无需关注数据来源即可传输数据;无需考虑客户端类型、代理机类型及不同编程语言环境的影响;

涉及概念解释:

  • Server(Broker):客户端连接建立后,负责处理AMQP协议中消息队列及路由功能的具体进程;
  • Virtual Host:类似于权限控制组的概念,允许多个Exchange及Queue共享使用;
  • Exchange:生产者发送的消息接收器,通过Routing Key将消息分配至指定队列Queue;
  • ExchangeType:决定了消息路由的行为模式,RabbitMQ提供三种类型:分发型(fanout)、直接型(direct)及主题型(topic);
  • Message Queue:用于临时存储尚未被消费者处理的消息;
  • Message:包含Headers字段和body字段,Headers记录生产者相关属性如持久化状态、优先级及所属Message Queue信息;
  • BindingKey:用于关联指定的一个Exchange实例与指定的一个Queue实例的关键字段组合。

5.1 RabbitMQ交换机的作用

在这里插入图片描述

生产者发送的消息并非传统途径直接投入队列,而是先进入交换机进行转发至具体队列,并经该队列通过推送或拉取的方式传递给消费者。这一流程与我们之前学习的Nginx功能颇为相似。

交换机按照特定的路由策略将信号转发至相应的队列中,并且交换机共有四种不同的类型。

  • Direct exchange(直连交换机)通过其携带的消息中的路由键将目标信息传递给相应的队列。
    • Fanout exchange(扇型交换机)将目标信息分配给所有与其相关联的队列。
    • Topic exchange(主题交换机)中各队列首先通过其携带的消息中的特定字段实现与交换机的有效连接;随后基于接收到的消息中所包含的关键字信息将其正确地分配给相关联的所有目标队列。
    • Headers exchange(头交换机)与主题交换机相似但使用多个消息属性代替路由键来建立更为复杂的 routing 规则;该类交换机制具体操作方式是:首先检查接收到的消息头中的各个字段值是否符合指定的 binding 条件;只有当所有匹配条件均满足时才会完成 routing 过程;由于这类机制应用较为复杂且效率不高故很少被实际采用。

5.2 消息队列RabbitMQ应答模式

为了避免消息丢失问题,RabbitMQ提供了一种名为消息确认机制的解决方案。当消费者向系统发送一条新消息时,它可以主动询问相关队列中是否存在对应的响应记录,从而确保数据完整性。如果在指定时间内未收到应答信息,系统会自动将该请求重投至指定队列以等待响应。这种设计确保即使客户端出现故障也不会导致数据丢失

当客户端关闭但未收到应答指令时,RabbitMQ系统会认为该消息尚未完成处理流程,从而将其重新分配给其他处理者继续处理这一请求。这种方式不仅能够保证所有信息都能被正确接收,还能有效防止因客户端故障导致的消息丢失问题

RabbitMQ取消了超时限制这一传统做法;只有在客户端发生故障后,系统才会自动将未处理的消息重新投递至指定队列供其他消费端重新处理这一操作无需依赖特定时间窗口来完成

默认情况下,RabbitMQ启用了此功能;然而我们可以通过配置将autoAsk参数设置为false关闭此功能以实现主动询问模式下的手动确认操作一旦任务完成并成功提交应答指令,相关消费者端能够立即收到系统的反馈并从内存空间中删除原始数据记录

5.2.1 案例

生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。

复制代码
    channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
    
      
    
    AI写代码

第二个参数值设为false用于关闭RabbitMQ的自动应答功能,并采用手动应答方式。完成消息处理后设置响应状态变量为true,则表示系统处于自动应答模式。

复制代码
    channel.basicAck(envelope.getDeliveryTag(), false);
    
    
      
    
    AI写代码

5.3 RabbitMQ的公平转发

目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。
换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。
通过设置channel.basicQos(1);

在这里插入图片描述

5.4 RabbitMQ消息确认机制

生产者发送消息出去后,默认无法确认是否成功到达RabbitMQ服务器。此外,在某些情况下,在消息被发送后出现逻辑错误时不再希望收到之前的消息,请问具体该如何操作?

5.4.1 解决方案

  • AMQP 事务机制
  • Confirm 模式

5.4.2 事务模式

  • txSelect 将current channel settings指定为transactional mode。
  • txCommit 该操作提交当前正在进行的事务。
  • txRollback 该操作使事务被回滚。
5.4.2.1 代码实现

5.4.3 Confirm 模式

全部评论 (0)

还没有任何评论哟~