RabbitMQ的简介和使用以及SpringBoot整合RabbitMQ
AMQP
该主流异步消息传递机制采用基于异步传输模型的应用层规范。位于线路层的该客户端可进行发送与接收操作;与诸如JMS这样的API不同的是;该客户端无需关心消息源即可发送或接收;在众多业务流程中发挥核心作用;其独特优势在于即便通过中间人传输信息而不受其影响。
RabbitMQ
RabbitMQ是由erlang开发的一个开源实现,主要用于在分布式系统中存储和转发消息
RabbitMQ核心概念
- Message:消息是由消息头和消息体构成的一种数据载体,在实际应用中具有重要价值。
- Publisher:发布者是指负责生成并发送特定类型信息的对象。
- Exchange:交换机是一种专门用于接收并根据预设规则将信息路由到目标队列的中间实体。
- Queue:队列是一种专门的数据存储容器,在指定时间向指定路径投放数据以等待处理的状态。
- Binding:绑定机制是指通过路由规则将特定队列与交换机关联起来的技术手段。
- Connection:网络连接是构建多信道通道的基础设施。
- Channel:信道是在TCP/IP协议下虚拟化的连接通道,在此之上可实现发布与订阅操作。
- Consumer:客户端应用程序通过此信道能够即时获取相关通知服务。
- Broker:的消息队列服务器实体包含多种功能模块以满足不同业务需求。
- Virtual Host(Vhost): 虚主机是指位于特定经纪节点下独立运行的小型虚拟服务器实体,在此之上可配置独立的队列、交换机及绑定机制以满足特定业务需求。

Exchange类型
- Direct::当信息中的Routing-key精确匹配Binding中的Binding-key时, 交换机会直接将该信息发送至对应的通道上,

任何一条被发送至fanout类型的交换器的消息都会被转发至所有绑定的队列上。 fanout类型的路由器不会处理路由键,并等同于进行广播操作。

- Topic:该交换器采用基于模式匹配的方法对请求进行路由分配操作。用于将请求路由键与特定模式进行匹配以确定相应的服务绑定位置系统会将请求的目标地址拆分成独立的单词序列各单词之间以点号分隔符分隔这种结构使得系统能够灵活地处理不同长度的服务绑定项其中通配符#对应匹配零个或多个前缀项而通配符*对应匹配单个前缀项在实际应用中例如当消息请求的目标地址为hello java时系统会发现该目标地址与hello # 模式部分相匹配从而会分别入队至hello java队列以及hi java队列

- headers:该类型在路由消息时不受routing key和binding key匹配规则的影响,而是依据发送的信息中的header字段来进行路由判断。
RabbitMQ安装
用docker进行安装
安装命令,安装带有management的版本,这样安装好自带管理页面
docker pull rabbitmq:management-alpine
代码解读
运行rabbitmq,5672端口是与客户端进行通信,15672是管理页面端口
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq
代码解读
然后通过Linux的ip地址加上rabbitmq的端口号15672来进行访问

通过默认的账号密码进行登录
默认账号:guest 密码:guest

然后创建交换器

创建三个交换器,对应了三种交换器类型,其他的是默认的交换器

创建四个队列

将队列与交换器进行绑定

为此将exchange.direct交换器进行绑定配置,并根据其为direct类型的属性,在配置时将该交换器的路由键设置与队列名称同步配置以实现便于后续管理维护的目的。
例如,只有当消息的路由键为hello.java的时候,才能进入hello.java队列

随后将exchange.fanout交换器注册到系统中,并注意到其fanout类型与广播模式相当类似。值得注意的是,在这种情况下由于路由键的影响较小此处的设置与队列名称保持一致

最终对exchange.topic主题交换器进行绑定;主题交换器用于pattern-based匹配;它的路由键如下图所示:

例如:假设消息的路由键为 Hello\#Java 的形式,则该路由键与 Hello\#- 模式匹配,并因此会分配到对应的Hello\#Java队列中;同时也会与 #Java 模式匹配并分配到相应的Hi\#Java队列中。
向direct交换器中发布消息,查看队列中存储的数据


在消息发布后, 因为direct交换器实现了完美的匹配, 所以唯有hello.java队列中存在消息。

我们可以通过Get Message来获取队列中的消息

向fanout交换器中发布消息

消息会被fanout分布到它所绑定的所有队列中,并非仅限于特定的路由键值。无论其路由键为何值时的信息都会被发送并重复广播给同一个收件人。

我们可以看到每个队列都收到了消息

将消息发送至topi交换器中进行处理,并非专门针对特定内容的消息;与上一次相同的消息会被系统识别并发送至相应的处理单元;该消息的路由键为hello.java,在当前配置下该路由键会与hello.# 进行匹配处理;因此会被分配至hello.java队列中;同时也会与#.java进行匹配处理,并被分配至hi.java队列中

浏览队列信息时,我们可以看到hello.java和hi.java队列的消息数量上升了1个,并表明有新消息到达。

我们查看hi.java队列中的内容

再次点击Get Message,从队列中获取消息

两次获取之后,消息队列为空

Spring-Boot整合RabbitMQ
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
代码解读
添加配置
spring:
rabbitmq:
host: 192.168.163.128 #Linux主机ip地址,如果rabbitmq在本地,就是localhost,默认值为localhost
username: guest #rabbitmq登录的用户名
password: guest #rabbitmq登录的密码
port: 5672 #rabbitmq通信端口
virtual-host: / #rabbitmq连接虚拟主机的名称
代码解读
编写代码测试连接
@RestController
public class RabbitMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/direct")
public String direct(){
//message需要自己构造一个,定义消息体内容和消息头
//rabbitTemplate.send(exchange,routekey,message);
// 只需要传入发送的对象,自动序列化发送给rabbitmq
// rabbitTemplate.convertAndSend(exchange,routekey,object);
Map<String,String> map = new HashMap<>();
map.put("msg1","消息1");
map.put("msg2","消息2");
rabbitTemplate.convertAndSend("exchange.direct","hello.java",map);
return "success";
}
@GetMapping("/get")
public String getMessage(){
Object o = rabbitTemplate.receiveAndConvert("hello.java");
return o.getClass()+"======"+o;
}
}
代码解读
启动项目
访问localhost:8080/direct

通过rabbitmq管理页面看到消息提交成功

通过管理页面查看数据

访问localhost:8080/get

队列中消息被删除

RabbitMQ中查看到的消息是乱码的
我们审查了RabbitTemplate的代码库,并确定其默认使用的MessageConverter类为SimpleMessageConverter

SimpleMessageConverter中用的是Java的序列化方式

为了将数据以JSON格式表示并在RabbitMQ中存储它们,必须配置一个MessageConverter
我们用Jackson2JsonMessageConverter

编写代码向容器中添加一个MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
代码解读
重新启动项目
访问localhost:8080/direct
RabbitMQ中信息以Json的形式保存

当前采用主动方式从消息队列中获取数据;那么如何实现一旦消息出现我们就能够从队列中自动获取消息呢?我们可以设置@RabbitListener注解来实现这一功能
@RabbitListener注解实现对消息队列的监听
编写一个Service
@Service
public class TestRabbitListener {
@RabbitListener(queues = "hello.java") //监听的队列
public void testListener(HashMap<String,String> map){ //方法中的参数是队列中存储数据的类型,我们存的是map
System.out.println("收到消息:"+map);
}
}
代码解读
启动类添加注解
@EnableRabbit
代码解读
启动项目
访问localhost:8080/direct 向队列中添加数据
我们每刷新一次,控制台就会自动输出添加的数据

AmqpAdmin管理组件
在开发过程中,在rabbitmq的管理页面中进行设置,并且在程序中使用AmqpAdmin则提供了简便的方法来进行配置。
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
//创建direct类型的交换器
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange.direct"));
//创建队列,队列的名字,是否持久化
amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
//将交换器和队列进行绑定
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange.direct"
,"amqp.hello",null));
System.out.println("创建成功");
}
代码解读
运行程序
通过管理页面可以看到新建的交换器和队列



