SpringBoot+RabbitMq延迟插件实现延时队列
发布时间
阅读量:
阅读量
SpringBoot+RabbitMq延迟插件实现延时队列
- 一.安装RabbitMq
- 二.话不多说直接上代码
-
- 1.导入maven依赖
- 2.在application.yml配置文件中引入RabbitMq配置信息
- 3.rabbitMq配置类
- 4.消息发送确认
- 5.创建控制器并向队列发送消息
- 6.发送队列消息的service
- 7.发送队列消息的service实现类
- 8.配置队列的监听者(完成延时操作)
一.安装RabbitMq
链接地址:
二.话不多说直接上代码
1.导入maven依赖
<!-- rabbitMq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.在application.yml配置文件中引入RabbitMq配置信息
spring:
rabbitmq:
host: 127.0.0.1(你的RabbitMq配置地址)
port: 5672(端口)
username: super(用户名)
password: Jmy2019.(密码)
virtual-host: ecosphere(主机)
3.rabbitMq配置类
package com.nuvole.merchant.conf.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/** * rabbit配置类(声明交换机、队列以及他们的绑定关系)
* * @author lc
* @date 2020/7/1 11:15
*/
@Configuration
public class AmqpConfig {
// 交换机名称
public static final String TEST_EXCHANGE_KEY = "exchange.pay";
// 队列名称(测试)
public static final String TEST_QUEUE_KEY = "test.pay";
// 队列路线/绑定关系(测试)
public static final String TEST_ROUTK = "test.pay";
@Bean
public Queue testQueue() {
return new Queue(TEST_QUEUE_KEY, true);
}
/** * 延时队列交换器
* * @author lc
* @date 2020/6/30 15:06
*/
@Bean
public CustomExchange testExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(TEST_EXCHANGE_KEY, "x-delayed-message", true, false, args);
}
@Bean
public Binding testBinding(CustomExchange testExchange, Queue testQueue) {
Binding binding = BindingBuilder.bind(testQueue).to(testExchange).with(TEST_ROUTK).noargs();
return binding;
}
}
4.消息发送确认
package com.nuvole.merchant.conf.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/** * 消息发送确认
* * @author lc
* @date 2020/7/1 11:14
*/
@Component
public class AmqpAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
} else {
System.out.println("消息发送确认失败:" + cause);
}
}
}
5.创建控制器并向队列发送消息
package com.nuvole.merchant.controller.v1;
import com.alibaba.fastjson.JSON;
import com.nuvole.merchant.conf.mq.AmqpConfig;
import com.nuvole.merchant.service.mq.QueueMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private QueueMessageService queueMessageService;
@GetMapping("/do")
public void dos(String msg, int msec) {
log.info("开始发送延时队列。。。。。。。。。。。。。");
log.info(new Date().toString() + "----------当前时间");
HashMap<String, Object> param = new HashMap<>();
param.put("params", 1);
param.put("order", 2);
//queueMessageService.send(AmqpConfig.IMPORT_EXCHANGE_KEY, AmqpConfig.TEST_1_QUEUE_KEY, JSON.toJSONString(param));
queueMessageService.delayedSend(AmqpConfig.TEST_EXCHANGE_KEY, AmqpConfig.TEST_QUEUE_KEY, msg, msec);
}
}
6.发送队列消息的service
package com.nuvole.merchant.service.mq;
/** * 发送消息
* * @author lc
* @date 2020/7/1 11:26
*/
public interface QueueMessageService {
/** * 发送正常队列消息
* * @author lc
* @date 2020/7/1 11:26
*/
void send(String exchangeKey, String routingKey, Object message);
/** * 发送延时队列消息
* * @param
* @author lc
* @date 2020/6/30 11:47
*/
void delayedSend(String exchangeKey, String routingKey, Object message, int msec);
}
7.发送队列消息的service实现类
package com.nuvole.merchant.service.mq;
import com.nuvole.util.IdGenerator;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class QueueMessageServiceImpl implements QueueMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(String exchangeKey, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(IdGenerator.getUUID());
rabbitTemplate.convertAndSend(exchangeKey, routingKey, message, correlationData);
}
@Override
public void delayedSend(String exchangeKey, String routingKey, Object msg,final int xdelay) {
rabbitTemplate.convertAndSend(exchangeKey, routingKey, msg, message -> {
// 设置延迟时间
message.getMessageProperties().setDelay(xdelay);
return message;
});
}
}
8.配置队列的监听者(完成延时操作)
package com.nuvole.merchant.conf.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/** * 监听队列消息
* * @Author: lc
* @Date: 2020/5/20 16:09
*/
@Slf4j
@Component
@EnableRabbit
public class TestReceiver {
@RabbitListener(queues = AmqpConfig.TEST_QUEUE_KEY)
public void process(String msg, Channel channel, Message message) {
log.info("======================延时队列开始执行。。。。。。。。。。。。。。");
log.info(new Date().toString() + ",延时收到了信息 message = " + msg);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 至此一切准备完毕,启动项目来看一下效果。 依次访问接口:
http://localhost:9097/test/do?msg=1&msec=10000
http://localhost:9097/test/do?msg=2&msec=5000
2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。
2020/07/01-14:11:58 [http-nio-9097-exec-5] INFO com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:11:58 CST 2020----------当前时间
return--message:1,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay
2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO com.nuvole.merchant.controller.v1.TestController- 开始发送延时队列。。。。。。。。。。。。。
2020/07/01-14:12:00 [http-nio-9097-exec-7] INFO com.nuvole.merchant.controller.v1.TestController- Wed Jul 01 14:12:00 CST 2020----------当前时间
return--message:2,replyCode:312,replyText:NO_ROUTE,exchange:exchange.pay,routingKey:test.pay
2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。
2020/07/01-14:12:05 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:05 CST 2020,延时收到了信息 message = 2
2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- ======================延时队列开始执行。。。。。。。。。。。。。。
2020/07/01-14:12:08 [SimpleAsyncTaskExecutor-1] INFO com.nuvole.merchant.conf.mq.TestReceiver- Wed Jul 01 14:12:08 CST 2020,延时收到了信息 message = 1
- 可以看到第一个消息10秒后才被消费,第二个消息5秒消费,符合预期结果。
**下一篇:Linux下安装RabbitMq以及延时插件**
全部评论 (0)
还没有任何评论哟~
