springboot 整合rabbitmq 以及介绍
rabbitmq在微服务架构中可以充当什么角色?
在请求一个微服务返回响应时通常需要1秒,在采用同步方式时,则意味着每个微服务依次执行任务流程所需的时间总和即为N秒,在这种情况下整个操作流程通常需要N秒来完成
那么消息中间件就可以:
在异构化的生态系统中,各独立的微服务将预设的消息转发给消息转达节点.其他各独立的微服务持续监控这一特定的消息.接收到该消息后,这些独立的组件将同步各自执行相应的操作步骤.
2,解除耦合
3,秒杀抢购的时候做限流和流量削峰
4,延时队列+websocket做订单超时通知
5,队列防止超卖问题
等等
MQ是消息通信协议的模型,实现MQ的两种主流方式是:AMQP和JMS
AMQP是高级消息队列协议,是一个进程间传递异步消息的网络协议,。
JMS是一种基于Java平台的消息传输服务 API,并且作为应用程序接口存在;同时它也是一种面向消息中间件的开发工具。
安装
官方网站:全栈消息中间人解决方案 — 免费试用版RabbitMQ
下载:https://www.rabbitmq.com/download.html
rabbitmq由erlang语言编写,所以安装之前需要安装erlang的环境
在Windows环境下执行指定命令以安装RabbitMQ插件程序。
在F盘路径下 rabbitmq_server-3.8.22\sbin目录中执行 rabbitmq-plugins.bat 启动 rebbitmq_management

http://localhost:15672/ 访问可视化界面
用户名密码:guest
rebbitmq的角色:
生产者:生产消息
mq服务端broker:存储消息的服务器
消费者:接收或者监听消息
虚拟机 virtual hosts:虚拟主机...是真正存储消息的组件,在服务端用于标识不同的业务类型
虚拟机机内部:
1,有一个queue队列,是真正存储消息的地方。
2,交换机exchage
起步依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
代码解释
配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
代码解释
测试
//创建交换机 主题模式
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic_exchange_springcloud1");
}
//创建队列
@Bean
public Queue queue(){
return new Queue("queue_springcloud1");
}
//创建绑定
@Bean
public Binding binding(){
//将指定的队列绑定给指定的交换机 ,
return BindingBuilder.bind(queue()).to(topicExchange()).with("item.*");
}
代码解释
声明多个队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 创建第一个交换机
@Bean
public TopicExchange topicExchange1() {
return new TopicExchange("topic_exchange_springcloud1");
}
// 创建第二个交换机
@Bean
public TopicExchange topicExchange2() {
return new TopicExchange("topic_exchange_springcloud2");
}
// 创建第一个队列
@Bean
public Queue queue1() {
return new Queue("queue_springcloud1");
}
// 创建第二个队列
@Bean
public Queue queue2() {
return new Queue("queue_springcloud2");
}
// 绑定第一个队列到第一个交换机
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange1()).with("item.*");
}
// 绑定第二个队列到第一个交换机,使用不同的路由键
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange1()).with("order.*");
}
// 绑定第一个队列到第二个交换机
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue1()).to(topicExchange2()).with("product.*");
}
// 绑定第二个队列到第二个交换机
@Bean
public Binding binding4() {
return BindingBuilder.bind(queue2()).to(topicExchange2()).with("customer.*");
}
}
代码解释
第二种方式声明多个队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Declarables queueAndExchangeDeclarations() {
// 声明队列
Queue queue1 = new Queue("queue1");
Queue queue2 = new Queue("queue2");
// 声明fanout交换机
FanoutExchange fanoutExchange = new FanoutExchange("fanoutExchange");
// 返回包含所有声明的Declarables对象
return new Declarables(queue1, queue2, fanoutExchange,
BindingBuilder.bind(queue1).to(fanoutExchange),
BindingBuilder.bind(queue2).to(fanoutExchange));
}
}
代码解释
这种方式的主要优势是可以在一个同一个地方集中管理所有的AMQP声明,并使配置更加集中和清晰。另一方面,在需要对每个队列或交换机实施更细致的配置需求以及希望明确区分关注点的情况下,则可能更适合采用之前提到的多@Bean方法的方式。
package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class Day45SpringcloudRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void sendMesssage() {
rabbitTemplate.convertAndSend("topic_exchange_springcloud1","item.insert","哈哈哈");
//测试
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码解释
消费者
package com.example.listenner;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
//@RabbitListener 监听某个队列
@RabbitListener(queues = "queue_springcloud1")
public void msg(String msg){
System.out.println(msg);
}
}
代码解释
