springboot3集成MQTT 发送信息和接收信息
介绍:本文章主要是针对springboot3集成MQTT发送信息和接收信息的简单操作
一、什么是MQTT
- MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。
- MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
- MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,列如:水位传感器,无人机,医疗设备、智能家居等都在使用MQTT。
二、那如何使用springboot集成MQTT实时接收发送信息。
1、我们先导入mqtt版本相关依赖。
<!-- MQTT spring-integration-mqtt 包含了 org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>
2、在application.yml中配置好相关属性,便与后续连接地址或者是账号密码发生变化,不需要去改代码直接修改配置文件即可,由于我这边使用的是购买的mqtt服务,不是自己本地配置mqtt服务所以连接地址不一定一个模样。
# mqtt 配置
com:
mqtt:
url: ssl://xxxxx.xxxx.xxx-xxxxx.emqxsl.cn // 连接地址
clientId: mqttx_7f91ba0d // 用户id(可以随机)
topics: test/# // 订阅的主题
username: serverManager // 账号
password: robsense2015 // 密码
timeout: 10 // 连接超时时长
keepalive: 20 // 心跳间隔 (每20秒对服务发送消息)
3.通过使用 @Configuration 和 @ConfigurationProperties 注解,Spring Boot 会自动将配置文件中的相关属性和数据就会绑定到这个类的字段上。
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/** * MQTT 客户端连接的基本配置,配置信息见 application.yml
*/
@Configuration
@ConfigurationProperties(prefix = "com.mqtt")
public class MqttConfiguration {
private String url;
private String clientId;
private String topics;
private String username;
private String password;
private String timeout;
private String keepalive;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getTimeout() {
return timeout;
}
public void setTimeout(String timeout) {
this.timeout = timeout;
}
public String getKeepalive() {
return keepalive;
}
public void setKeepalive(String keepalive) {
this.keepalive = keepalive;
}
}
4.定义一个MqttGateWay 的类,然后使用 Spring Integration 的 @MessagingGateway 注解定义的接口用于简化与 MQTT 消息代理的交互,通过访问接口的方式发送消息到 MQTT 主题,而不需要直接与 MQTT 客户端库交互。
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/** * mqtt 网关服务
*/
@MessagingGateway(defaultRequestChannel = "mqttOutBoundChannelIOT")
public interface MqttGateWay {
/** * 给指定主题发信息
* @param data
*/
void sendMessageToMqtt(String data);
/** * 通过主题名称发信息
* @param data
* @param topic
*/
void sendMessageToMqtt(String data, @Header(MqttHeaders.TOPIC)String topic);
/** * 通过主题名称发文件
* @param fileContent
* @param topic
*/
void sendFileToMqtt(byte[] fileContent, @Header(MqttHeaders.TOPIC)String topic);
/** * 通过主题名称和qos类型发送消息
* @param data
* @param topic
* @param qos
*/
void sendMessageToMqTT(String data, @Header(MqttHeaders.TOPIC)String topic, @Header(MqttHeaders.QOS)int qos);
}
5.MqttInBoundConfiguration是SpringBoot配置类,用于设置和管理 MQTT 客户端的连接,以便监听特定的 MQTT 主题并处理接收到的消息。
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.*;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
/** * 实现了对 inboundtopic 中的主题监听,当有消息推送到 inboundtopic 主题上时可以接受
* MQTT 消费端
*/
@Configuration
@IntegrationComponentScan
public class MqttInBoundConfiguration {
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/** * 自动注入
*/
@Autowired
private MqttConfiguration mqttProperties;
/** * 创建MQTT客户端连接
*/
@Bean
public MqttPahoClientFactory mqttInClient(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// 如果配置中有多个连接地址,就可以使用split来分割进行存储
String[] mqttServerUrls = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttServerUrls);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setKeepAliveInterval(2);
//接受离线消息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
/** * 配置Client,监听Topic
* 监听MQTT主题并将消息发送到指定通道
* 如果要配置多个client,应该怎么处理呢?这个也简单, 模仿此方法,多写几个client
*/
@Bean
public MessageProducer inBound(){
// 如果配置中有多个主题订阅,就可以使用split来分割进行存储
String[] inBoundTopics = mqttProperties.getTopics().split(",");
// 用于监听mqtt主题客户端,并接收到的消息传为Spring Integration消息,然后发送到指定的消息通道中,
// 使用mqttInClient方法创建MQTT客户端,并指定要订阅的主题
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttProperties.getClientId()+"_inbound",
mqttInClient(),
inBoundTopics);
// 适配器还设置了完成超时、QoS等级、消息转换器以及输出通道
adapter.setCompletionTimeout(1000*5);
adapter.setQos(0);
adapter.setConverter(new DefaultPahoMessageConverter());
// 当MQTT客户端接收到来自指定主题的消息时,适配器会将消息发送到mqttInputChannel()指定的通道中
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/** * 通过通道获取数据,即处理MQTT发过来的消息,可以通过 MQTTX 工具发送数据测试
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") // 异步处理
public MessageHandler handler(){
return new MessageHandler() {
// 处理消息
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// 获取负载
Object payload = message.getPayload();
// 获取消息头
MessageHeaders messageHeaders = message.getHeaders();
// 获取消息id
UUID packetId = messageHeaders.getId();
// 获取qos等级
Object qos = messageHeaders.get(MqttHeaders.RECEIVED_QOS);
// 获取接收的主题
Object topic = messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
String handMessage = "MQTT Client " + " packetId:" + packetId
+ "\nReceive payLoad:" + payload
+ "\tQOS:" + qos
+ "\nTopics:" + topic;
System.out.println("返回的数据打印:"+handMessage);
}
}
}
6.MqttOutBoundConfiguration 也是一个定义好的 Spring Boot 配置类,用于设置和管理 MQTT 客户端的发送功能,它定义了如何创建 MQTT 客户端连接、配置连接参数,并将消息发送到指定的 MQTT 主题,建立一个mqttOutBoundChannelIOT 的消息通道,用于发送消息到mqtt主题。
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/** * MQTT 生产端
* */
@Configuration
public class MqttOutBoundConfiguration {
@Autowired
MqttConfiguration mqttConfiguration;
/** * 指定某一个主题发送信息
* @return
*/
@Bean
public MessageChannel mqttOutBoundChannelIOT(){
return new DirectChannel();
}
/** * 创建MQTT客户端连接,配置连接的参数选项
*/
@Bean
public MqttPahoClientFactory mqttOutClient(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// 如果配置中有多个连接地址,就可以使用split来分割进行存储
String[] mqttServerUrls = mqttConfiguration.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttServerUrls);
options.setUserName(mqttConfiguration.getUsername());
options.setPassword(mqttConfiguration.getPassword().toCharArray());
//接受离线消息,告诉代理客户端是否要建立持久会话 false为建立持久
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
/** * 指定某一个通过发送mqtt信息
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutBoundChannelIOT")
public MessageHandler mqttOutBoundIOT(){
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfiguration.getClientId()+"_outbound",
mqttOutClient());
// 如果没有传入主题就会给默认主题发送信息
messageHandler.setDefaultTopic("iots");
messageHandler.setAsync(true);
return messageHandler;
}
}
7、我们在创建一个MqttGateWayController的控制层,用于给用户调用接口进行消息发送,在控制层中写了两种,第一种发送是可以虽然发送String,Int,json格式的数据,第二种是文件上传将文件中的数据读取出来在进行发送数据,在目前已知情况下mqtt是不应许直接发送文件的。
import com.robsense.droneinspectionplatfom.mqtt.MqttGateWay;
import com.robsense.droneinspectionplatfom.utils.Result;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
@Controller
@RequestMapping(value = "/mqtt")
@Tag(name = "MqttController", description = "MQTT 访问控制")
public class MqttGateWayController {
// 用于精准依赖注入
@Qualifier("mqttGateWay")
@Autowired
private MqttGateWay mqttGateWayService;
/** * 向主题发送消息
* @return
*/
@PostMapping(value = "/sendMqtt")
@Operation(summary = "向指定主题中,发送消息")
@ResponseBody
@Parameters({
@Parameter(name = "topic",description = "主题",required = true),
@Parameter(name = "data",description = "数据结构",required = true)
})
public Result SendMqtt(String topic, String data){
try {
mqttGateWayService.sendMessageToMqtt(data,topic);
return Result.success(data);
}catch (Exception e){
e.printStackTrace();
}
return Result.error("操作失败",500);
}
/** * 向主题发送文件
* @return
*/
@PostMapping(value = "/sendFileMqtt")
@Operation(summary = "向指定主题中,发送文件")
@ResponseBody
@Parameters({
@Parameter(name = "topic",description = "主题",required = true),
@Parameter(name = "file",description = "文件",required = true)
})
public Result SendFileMqtt(String topic, MultipartFile file){
byte[] fileContent = new byte[(int) file.getSize()];
try (InputStream inputStream = file.getInputStream()) {
inputStream.read(fileContent);
String fileName = file.getOriginalFilename();
mqttGateWayService.sendFileToMqtt(fileContent, topic);
return Result.success(fileName,"操作成功");
} catch (IOException e) {
// 处理异常
e.printStackTrace();
}
return Result.error("操作失败",500);
}
谢谢大家收看,如果对您有很大帮助那自然更好。
