SpringBoot实现MQTT消息发送和接收
发布时间
阅读量:
阅读量
目录
1、maven依赖
2.yml配置
3.配置获取yml信息
4.发送
4.1 相关问题:
4.2 只发不接收可使用Netty-client 具体看Netty那章,等我空了写
5.接收信息
6.监听
7.总结
Spring integration交互逻辑
对于发布者:
消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。
对于订阅者:
通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。
1、maven依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
2.yml配置
mqtt:
appid: mqttx_${random.value}
#订阅主题,多个主题用逗号分隔
inputTopic: test009
#MQTT服务器地址
services: tcp://ip:1883
#mqtt用户名
user:
#mqtt密码,默认无
password:
#心跳间隔时间,默认300
KeepAliveInterval: 30
#是否不保持session,默认false
CleanSession: false
#是否自动连接,默认true
AutomaticReconnect: true
#连接超时,默认30000
CompletionTimeout: 30000
#传输质量,默认1
Qos: 1
3.配置获取yml信息
@Value("${mqtt.appid}")
private String appid;
@Value("${mqtt.inputTopic}")
private String[] inputTopic;//订阅主题
// @Value("${mqtt.outTopic}")
// private String[] outTopic;//发布主题
@Value("${mqtt.services}")
private String[] mqttServices;//服务器地址以及端口
@Value("${mqtt.user}")
private String user;//用户名
@Value("${mqtt.password}")
private String password;//密码
@Value("${mqtt.KeepAliveInterval}")
private Integer KeepAliveInterval;//心跳时间
@Value("${mqtt.CleanSession}")
private Boolean CleanSession;//是否不保持session,默认为session保持
@Value("${mqtt.AutomaticReconnect}")
private Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联
@Value("${mqtt.CompletionTimeout}")
private Long CompletionTimeout;//连接超时,默认为30秒
@Value("${mqtt.Qos}")
private Integer Qos;//通信质量
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
MqttConnectOptions options = new MqttConnectOptions();//连接参数
options.setServerURIs(mqttServices);//连接地址
if(user!=null) {
options.setUserName(user);//用户名
}
if(password!=null) {
options.setPassword(password.toCharArray());//密码
}
options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
options.setCleanSession(CleanSession);//保持session
factory.setConnectionOptions(options);
return factory;
}
4.发送
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttOutboundConfiguration {
@Autowired
private MqttMessageService mqttMessageService;
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttMessageService.getAppid()+ "_producer", mqttMessageService.mqttClientFactory());
// 如果设置成true,发送消息时将不会阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttMessageService.getInputTopic()[0]);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
// DirectChannel dc =new DirectChannel();
// dc.subscribe(mqttOutbound());
return new DirectChannel();
}
}
4.1 相关问题:
可以在发送时接收信息,代码如下:
它会接收到你发送的,以及接收的数据,但在input正常接收的频道它就会失效了
而且发送的消息在MQTT客户端没有显示出来,问题是啥不清楚,所以尽量避免
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttOutboundConfiguration {
@Autowired
private MqttMessageService mqttMessageService;
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttMessageService.getAppid()+ "_producer", mqttMessageService.mqttClientFactory());
// 如果设置成true,发送消息时将不会阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttMessageService.getInputTopic()[0]);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
// DirectChannel dc =new DirectChannel();
// dc.subscribe(mqttOutbound());
return new DirectChannel();
}
/** * mqtt发布者信道名称 mqttOutboundChannel
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
MessageHeaders headers = message.getHeaders();
//获取消息Topic
String topic = (String) headers.get(MqttHeaders.TOPIC);
String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
log.info("[获取到的消息的topic-out :]{} ", topic);
log.info("[获取到的消息的receivedTopic :]{} ", receivedTopic);
//获取消息体
String payload = (String) message.getPayload();
log.info("[获取到的消息的payload :]{} ", payload);
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
}
4.2 只发不接收可使用Netty-client 具体看Netty那章,等我空了写
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>netty-mqtt-client</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
5.接收信息
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttInboundConfiguration {
@Autowired
private MqttMessageService mqttMessageService;
@Autowired
private MqttReceiveHandle mqttReceiveHandle;
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttMessageService.getAppid()+ "_consumer",
mqttMessageService.mqttClientFactory(), mqttMessageService.getInputTopic());
adapter.setCompletionTimeout(60000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setRecoveryInterval(10000);
adapter.setQos(0);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/** * mqtt订阅者使用信道名称 mqttInboundChannel
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
mqttReceiveHandle.handle(message);
}
};
}
所以input 和 out 最好放一起,不然发送接口就会发现Bean重复注入,导致失败;
贴个完整的代码
package com.example.trainserver.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConfiguration {
@Value("${mqtt.appid}")
public String appid;
@Value("${mqtt.inputTopic}")
//订阅主题
public String[] inputTopic;
// @Value("${mqtt.outTopic}")
// private String[] outTopic;//发布主题
@Value("${mqtt.services}")
//服务器地址以及端口
public String[] mqttServices;
@Value("${mqtt.user}")
//用户名
public String user;
@Value("${mqtt.password}")
//密码
public String password;
@Value("${mqtt.KeepAliveInterval}")
//心跳时间
public Integer KeepAliveInterval;
@Value("${mqtt.CleanSession}")
//是否不保持session,默认为session保持
public Boolean CleanSession;
@Value("${mqtt.AutomaticReconnect}")
//是否自动重联,默认为开启自动重联
public Boolean AutomaticReconnect;
@Value("${mqtt.CompletionTimeout}")
//连接超时,默认为30秒
public Long CompletionTimeout;
@Value("${mqtt.Qos}")
//通信质量
public Integer Qos;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
MqttConnectOptions options = new MqttConnectOptions();//连接参数
options.setServerURIs(mqttServices);//连接地址
if (user != null) {
options.setUserName(user);//用户名
}
if (password != null) {
options.setPassword(password.toCharArray());//密码
}
options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
options.setCleanSession(CleanSession);//保持session
factory.setConnectionOptions(options);
return factory;
}
@Autowired
private MqttReceiveHandle mqttReceiveHandle;
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid + "__consumer",
mqttClientFactory(), inputTopic);
adapter.setCompletionTimeout(60000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setRecoveryInterval(10000);
adapter.setQos(0);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/** * mqtt订阅者使用信道名称 mqttInboundChannel
* * @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
mqttReceiveHandle.handle(message);
}
};
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(appid + "__producer", mqttClientFactory());
// 如果设置成true,发送消息时将不会阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(inputTopic[0]);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
// DirectChannel dc =new DirectChannel();
// dc.subscribe(mqttOutbound());
return new DirectChannel();
}
/** * mqtt发布者信道名称 mqttOutboundChannel
* @return
*/
/* @Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("AAAAA");
mqttReceiveHandle.handle(message);
}
};
}*/
}
6.监听
简单地看了一下事件信息,他监听不到具体的message,所以pass
@Slf4j
@Component
public class MqttListener {
/** * 连接失败的事件通知
* @param mqttConnectionFailedEvent
*/
@EventListener(classes = MqttConnectionFailedEvent.class)
public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
log.info("连接失败的事件通知");
}
/** * 已发送的事件通知
* @param mqttMessageSentEvent
*/
@EventListener(classes = MqttMessageSentEvent.class)
public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
log.info("已发送的事件通知");
}
/** * 已传输完成的事件通知
* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
* @param mqttMessageDeliveredEvent
*/
@EventListener(classes = MqttMessageDeliveredEvent.class)
public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
log.info("已传输完成的事件通知");
}
/** * 消息订阅的事件通知
* @param mqttSubscribedEvent
*/
@EventListener(classes = MqttSubscribedEvent.class)
public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
log.info("消息订阅的事件通知{}",mqttSubscribedEvent.toString());
}
}
7.总结
MQTT接收就是接收,没有所谓的消费处理逻辑,所以只要能收到,就能处理
ActiveMQ,是用监听来处理信息的,处理完有一个数据消费的处理步骤
@Service
@Slf4j
public class ERPListener implements SessionAwareMessageListener<Message> {
@Override
@JmsListener(destination = ImsQueueName.ERP_SEND_QUEUE, containerFactory = "queueContainerFactory")
public void onMessage(javax.jms.Message message, Session session) throws JMSException {
if (message == null) {
log.error("接收消息为空。。。。。。。。。。。");
return;
}
try {
log.info("ERP数据推送消息==============={}", System.currentTimeMillis());
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
MqMsgLog mqMsgLog = JSONObject.parseObject(text, MqMsgLog.class);
String body = mqMsgLog.getBody();
log.error("===ERP数据推送消息 队列消费失败===:{}", e.getMessage());
session.recover();
}
}
}
ActiveMQQueue queueDestination = new ActiveMQQueue(ImsQueueName.ERP_SEND_QUEUE);
mqProducerService.delaySendMessage(queueDestination, citicSplitRecord.getCiticSplitRecordId().toString(),300);
全部评论 (0)
还没有任何评论哟~
