Advertisement

124.RabbitMQ(二):springboot整合rabbitmq,mq的使用场景

阅读量:

目录

一、基础配置

1.新建项目,引入依赖

2.编辑配置文件

二、helloworld模式

三、workqueue模式公平消费

四、发布订阅(广播)模型

五、Direct直接路由模型

六、Topic动态路由模型

七、MQ应用场景说明

1.异步处理

2.应用解耦

3.流量削峰

八、打赏请求


声明:本系列代码均通过测试,可放心使用

一、基础配置

1.新建项目,引入依赖

创建好后,查看一下pom.xml:

按照我们的方法,rabbitmq已经集成了。

复制代码
 <?xml version="1.0" encoding="UTF-8"?>

    
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    
     <modelVersion>4.0.0</modelVersion>
    
     <parent>
    
     <groupId>org.springframework.boot</groupId>
    
     <artifactId>spring-boot-starter-parent</artifactId>
    
     <version>2.5.2</version>
    
     <relativePath/> <!-- lookup parent from repository -->
    
     </parent>
    
     <groupId>com.xupeng</groupId>
    
     <artifactId>rabbitmq</artifactId>
    
     <version>0.0.1-SNAPSHOT</version>
    
     <name>rabbitmq</name>
    
     <description>Demo project for Spring Boot</description>
    
     <properties>
    
     <java.version>1.8</java.version>
    
     </properties>
    
     <dependencies>
    
     <!--rabbitmq已经集成在这个依赖里了-->
    
     <dependency>
    
         <groupId>org.springframework.boot</groupId>
    
         <artifactId>spring-boot-starter-amqp</artifactId>
    
     </dependency>
    
     <dependency>
    
         <groupId>org.springframework.boot</groupId>
    
         <artifactId>spring-boot-starter-web</artifactId>
    
     </dependency>
    
  
    
     <dependency>
    
         <groupId>org.springframework.boot</groupId>
    
         <artifactId>spring-boot-starter-test</artifactId>
    
         <scope>test</scope>
    
     </dependency>
    
     <dependency>
    
         <groupId>org.springframework.amqp</groupId>
    
         <artifactId>spring-rabbit-test</artifactId>
    
         <scope>test</scope>
    
     </dependency>
    
     <dependency>
    
         <groupId>junit</groupId>
    
         <artifactId>junit</artifactId>
    
         <version>4.12</version>
    
         <scope>test</scope>
    
     </dependency>
    
     </dependencies>
    
  
    
     <build>
    
     <plugins>
    
         <plugin>
    
             <groupId>org.springframework.boot</groupId>
    
             <artifactId>spring-boot-maven-plugin</artifactId>
    
         </plugin>
    
     </plugins>
    
     </build>
    
  
    
 </project>

2.编辑配置文件

复制代码
 spring:

    
   application:
    
     name: rabbitmq-springboot
    
   rabbitmq:
    
     host: 192.168.210.137
    
     port: 5672
    
     username: ems
    
     password: ems
    
     virtual-host: /ems

二、helloworld模式

复制代码
复制代码
 package com.xupeng.rabbitmq.hello;

    
  
    
 import org.springframework.amqp.rabbit.annotation.Queue;
    
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 14:40]
    
  */
    
 @Component
    
 //默认持久化,非独占,非自动删除队列
    
 @RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "true",autoDelete = "false"))
    
 public class HelloConsumer {
    
 	@RabbitHandler
    
 	private void receive(String message){
    
 		System.out.println("message:"+message);
    
 	}
    
 }

RabbitmqApplicationTests:

复制代码
 package com.xupeng.rabbitmq;

    
  
    
 import org.junit.Test;
    
 import org.junit.runner.RunWith;
    
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.boot.test.context.SpringBootTest;
    
 import org.springframework.test.context.junit4.SpringRunner;
    
  
    
 @SpringBootTest(classes = RabbitmqApplication.class)
    
 @RunWith(SpringRunner.class)
    
 public class RabbitmqApplicationTests {
    
  
    
 	@Autowired
    
 	private RabbitTemplate rabbitTemplate;
    
  
    
 	//helloworld
    
 	@Test
    
 	public void test(){
    
 		//参数一:队列名
    
 		//参数二:队列内容
    
 		rabbitTemplate.convertAndSend("hello","hello world");
    
 	}
    
 }

三、workqueue模式公平消费

WorkConsumer:

复制代码
 package com.xupeng.rabbitmq.work;

    
  
    
 import org.springframework.amqp.rabbit.annotation.Queue;
    
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:08]
    
  */
    
 @Component
    
 public class WorkConsumer {
    
  
    
 	@RabbitListener(queuesToDeclare = @Queue("work"))
    
 //	@Queue(value = "hello",durable = "true",autoDelete = "false")
    
 	public void receive1(String message){
    
 		System.out.println("message1:"+message);
    
 	}
    
  
    
 	@RabbitListener(queuesToDeclare = @Queue("work"))
    
 	public void receive2(String message){
    
 		System.out.println("message2:"+message);
    
 	}
    
 }

WorkQueueTest:

复制代码
 package com.xupeng.rabbitmq.test;

    
  
    
 import com.xupeng.rabbitmq.RabbitmqApplication;
    
 import org.junit.Test;
    
 import org.junit.runner.RunWith;
    
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.boot.test.context.SpringBootTest;
    
 import org.springframework.test.context.junit4.SpringRunner;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:06]
    
  */
    
 @SpringBootTest(classes = RabbitmqApplication.class)
    
 @RunWith(SpringRunner.class)
    
 public class WorkQueueTest {
    
 	@Autowired
    
 	private RabbitTemplate rabbitTemplate;
    
  
    
 	//workqueue
    
 	@Test
    
 	public void test(){
    
 		//参数一:队列名
    
 		//参数二:队列内容
    
 		for(int i = 0;i<10;i++) {
    
 			rabbitTemplate.convertAndSend("work", "work模型"+i);
    
 		}
    
 	}
    
 }

四、发布订阅(广播)模型

FanoutTest:

复制代码
 package com.xupeng.rabbitmq.test;

    
  
    
 import com.xupeng.rabbitmq.RabbitmqApplication;
    
 import org.junit.Test;
    
 import org.junit.runner.RunWith;
    
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.boot.test.context.SpringBootTest;
    
 import org.springframework.test.context.junit4.SpringRunner;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:06]
    
  */
    
 @SpringBootTest(classes = RabbitmqApplication.class)
    
 @RunWith(SpringRunner.class)
    
 public class FanoutTest {
    
 	@Autowired
    
 	private RabbitTemplate rabbitTemplate;
    
  
    
 	//Fanout
    
 	@Test
    
 	public void test(){
    
 		//参数一:交换机名
    
 		//参数二:routingKey
    
 		//参数三:内容
    
 		rabbitTemplate.convertAndSend("logs","", "fanout模型");
    
 	}
    
 }

FanoutConsumer:

复制代码
 package com.xupeng.rabbitmq.fanout;

    
  
    
 import org.springframework.amqp.rabbit.annotation.Exchange;
    
 import org.springframework.amqp.rabbit.annotation.Queue;
    
 import org.springframework.amqp.rabbit.annotation.QueueBinding;
    
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:16]
    
  */
    
 @Component
    
 public class FanoutConsumer {
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "logs",type = "fanout")
    
 	))
    
 	public void receive1(String message){
    
 		System.out.println("message1:"+message);
    
 	}
    
  
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "logs",type = "fanout")
    
 	))
    
 	public void receive2(String message){
    
 		System.out.println("message2:"+message);
    
 	}
    
 }

五、Direct直接路由模型

DirectRoutingTest:

复制代码
 package com.xupeng.rabbitmq.test;

    
  
    
 import com.xupeng.rabbitmq.RabbitmqApplication;
    
 import org.junit.Test;
    
 import org.junit.runner.RunWith;
    
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.boot.test.context.SpringBootTest;
    
 import org.springframework.test.context.junit4.SpringRunner;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:06]
    
  */
    
 @SpringBootTest(classes = RabbitmqApplication.class)
    
 @RunWith(SpringRunner.class)
    
 public class DirectRoutingTest {
    
 	@Autowired
    
 	private RabbitTemplate rabbitTemplate;
    
  
    
 	//DirectRouting
    
 	@Test
    
 	public void test(){
    
 		//参数一:交换机名
    
 		//参数二:routingKey
    
 		//参数三:内容
    
 		rabbitTemplate.convertAndSend("directs","error", "DirectRouting模型");
    
 	}
    
 }

DirectRoutingConsumer:

复制代码
 package com.xupeng.rabbitmq.directRouting;

    
  
    
 import org.springframework.amqp.rabbit.annotation.Exchange;
    
 import org.springframework.amqp.rabbit.annotation.Queue;
    
 import org.springframework.amqp.rabbit.annotation.QueueBinding;
    
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:24]
    
  */
    
 @Component
    
 public class DirectRoutingConsumer {
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "directs", type = "direct"),//自定义交换机名称和类型
    
 			key = {"info","error","warning"}
    
 	))
    
 	public void receive1(String message) {
    
  
    
  
    
 		System.out.println("message1:" + message);
    
 	}
    
  
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "directs", type = "direct"),//自定义交换机名称和类型
    
 			key = {"info"}
    
 	))
    
 	public void receive2(String message){
    
 		System.out.println("message2:"+message);
    
 	}
    
 }

六、Topic动态路由模型

TopicRoutingTest:

复制代码
 package com.xupeng.rabbitmq.test;

    
  
    
 import com.xupeng.rabbitmq.RabbitmqApplication;
    
 import org.junit.Test;
    
 import org.junit.runner.RunWith;
    
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.boot.test.context.SpringBootTest;
    
 import org.springframework.test.context.junit4.SpringRunner;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:06]
    
  */
    
 @SpringBootTest(classes = RabbitmqApplication.class)
    
 @RunWith(SpringRunner.class)
    
 public class TopicRoutingTest {
    
 	@Autowired
    
 	private RabbitTemplate rabbitTemplate;
    
  
    
 	//TopicRouting
    
 	@Test
    
 	public void test(){
    
 		//参数一:交换机名
    
 		//参数二:routingKey
    
 		//参数三:内容
    
 		rabbitTemplate.convertAndSend("topics","user.save.list", "TopicRouting模型");
    
 	}
    
 }

TopicRoutingConsumer:

复制代码
 package com.xupeng.rabbitmq.topicRouting;

    
  
    
 import org.springframework.amqp.rabbit.annotation.Exchange;
    
 import org.springframework.amqp.rabbit.annotation.Queue;
    
 import org.springframework.amqp.rabbit.annotation.QueueBinding;
    
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 /** * [一句话描述该类的功能]
    
  * * @author : [xupeng]
    
  * @version : [v1.0]
    
  * @createTime : [2021/7/7 15:33]
    
  */
    
 @Component
    
 public class TopicRoutingConsumer {
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "topics", type = "topic"),//自定义交换机名称和类型
    
 			key = {"user.*"}
    
 	))
    
 	public void receive1(String message) {
    
  
    
  
    
 		System.out.println("message1:" + message);
    
 	}
    
  
    
 	@RabbitListener(bindings = @QueueBinding(
    
 			value = @Queue,//临时队列
    
 			exchange = @Exchange(value = "topics", type = "topic"),//自定义交换机名称和类型
    
 			key = {"user.#","product.#"}
    
 	))
    
 	public void receive2(String message){
    
 		System.out.println("message2:"+message);
    
 	}
    
 }

七、MQ应用场景说明

1.异步处理

在用户注册成功之后,向其发送短信及电子邮箱。其中通过mq系统进行处理的为邮件及短信。无需等待上述两种通信渠道的全部完成。

2.应用解耦

订单系统与库存系统之间通过消息队列(mq)实现消息传递机制,在任何一个系统的故障情况下都不会影响另一方的正常运行。

3.流量削峰

在电商网站的双十一0秒促销期间,在流量发送请求至消息队列后转为主业务处理。为了优化资源分配,在消息队列中设置了请求数量上限(threshold),超出该阈值的请求将被丢弃。

八、打赏请求

如果本篇博客对您有所帮助,打赏一点呗,谢谢了呢~

全部评论 (0)

还没有任何评论哟~