Advertisement

springboot3集成MQTT 发送信息和接收信息

阅读量:

介绍:本文章主要是针对springboot3集成MQTT发送信息和接收信息的简单操作

一、什么是MQTT

  1. MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。
  2. MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
  3. MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,列如:水位传感器,无人机,医疗设备、智能家居等都在使用MQTT。

二、那如何使用springboot集成MQTT实时接收发送信息。

1、我们先导入mqtt版本相关依赖。

复制代码
 <!-- MQTT  spring-integration-mqtt  包含了 org.eclipse.paho.client.mqttv3  -->

    
 <dependency>
    
     <groupId>org.springframework.integration</groupId>
    
     <artifactId>spring-integration-mqtt</artifactId>
    
     <version>5.5.14</version>
    
 </dependency>
    
    
    
    

2、在application.yml中配置好相关属性,便与后续连接地址或者是账号密码发生变化,不需要去改代码直接修改配置文件即可,由于我这边使用的是购买的mqtt服务,不是自己本地配置mqtt服务所以连接地址不一定一个模样。

复制代码
 # mqtt 配置

    
 com:
    
   mqtt:
    
     url: ssl://xxxxx.xxxx.xxx-xxxxx.emqxsl.cn  // 连接地址
    
     clientId: mqttx_7f91ba0d // 用户id(可以随机)
    
     topics: test/# // 订阅的主题
    
     username: serverManager // 账号
    
     password: robsense2015 // 密码
    
     timeout: 10 // 连接超时时长
    
     keepalive: 20 // 心跳间隔 (每20秒对服务发送消息)
    
    
    
    

3.通过使用 @Configuration@ConfigurationProperties 注解,Spring Boot 会自动将配置文件中的相关属性和数据就会绑定到这个类的字段上。

复制代码
 import org.springframework.boot.context.properties.ConfigurationProperties;

    
 import org.springframework.context.annotation.Configuration;
    
  
    
 /** * MQTT 客户端连接的基本配置,配置信息见 application.yml
    
  */
    
  
    
 @Configuration
    
 @ConfigurationProperties(prefix = "com.mqtt")
    
 public class MqttConfiguration {
    
     private String url;
    
     private String clientId;
    
     private String topics;
    
     private String username;
    
     private String password;
    
     private String timeout;
    
     private String keepalive;
    
  
    
     public String getUrl() {
    
     return url;
    
     }
    
  
    
     public void setUrl(String url) {
    
     this.url = url;
    
     }
    
  
    
     public String getClientId() {
    
     return clientId;
    
     }
    
  
    
     public void setClientId(String clientId) {
    
     this.clientId = clientId;
    
     }
    
  
    
     public String getTopics() {
    
     return topics;
    
     }
    
  
    
     public void setTopics(String topics) {
    
     this.topics = topics;
    
     }
    
  
    
     public String getUsername() {
    
     return username;
    
     }
    
  
    
     public void setUsername(String username) {
    
     this.username = username;
    
     }
    
  
    
     public String getPassword() {
    
     return password;
    
     }
    
  
    
     public void setPassword(String password) {
    
     this.password = password;
    
     }
    
  
    
     public String getTimeout() {
    
     return timeout;
    
     }
    
  
    
     public void setTimeout(String timeout) {
    
     this.timeout = timeout;
    
     }
    
  
    
     public String getKeepalive() {
    
     return keepalive;
    
     }
    
  
    
     public void setKeepalive(String keepalive) {
    
     this.keepalive = keepalive;
    
     }
    
 }
    
    
    
    

4.定义一个MqttGateWay 的类,然后使用 Spring Integration 的 @MessagingGateway 注解定义的接口用于简化与 MQTT 消息代理的交互,通过访问接口的方式发送消息到 MQTT 主题,而不需要直接与 MQTT 客户端库交互。

复制代码
 import org.springframework.integration.annotation.MessagingGateway;

    
 import org.springframework.integration.mqtt.support.MqttHeaders;
    
 import org.springframework.messaging.handler.annotation.Header;
    
  
    
 /** * mqtt 网关服务
    
  */
    
  
    
 @MessagingGateway(defaultRequestChannel = "mqttOutBoundChannelIOT")
    
 public interface MqttGateWay {
    
  
    
     /** * 给指定主题发信息
    
      * @param data
    
      */
    
     void sendMessageToMqtt(String data);
    
  
    
     /** * 通过主题名称发信息
    
      * @param data
    
      * @param topic
    
      */
    
     void sendMessageToMqtt(String data, @Header(MqttHeaders.TOPIC)String topic);
    
  
    
     /** * 通过主题名称发文件
    
      * @param fileContent
    
      * @param topic
    
      */
    
     void sendFileToMqtt(byte[] fileContent, @Header(MqttHeaders.TOPIC)String topic);
    
  
    
     /** * 通过主题名称和qos类型发送消息
    
      * @param data
    
      * @param topic
    
      * @param qos
    
      */
    
     void sendMessageToMqTT(String data, @Header(MqttHeaders.TOPIC)String topic, @Header(MqttHeaders.QOS)int qos);
    
 }
    
    
    
    

5.MqttInBoundConfiguration是SpringBoot配置类,用于设置和管理 MQTT 客户端的连接,以便监听特定的 MQTT 主题并处理接收到的消息。

复制代码
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.context.annotation.Bean;
    
 import org.springframework.context.annotation.Configuration;
    
 import org.springframework.integration.annotation.IntegrationComponentScan;
    
 import org.springframework.integration.annotation.ServiceActivator;
    
 import org.springframework.integration.channel.DirectChannel;
    
 import org.springframework.integration.core.MessageProducer;
    
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    
 import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    
 import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    
 import org.springframework.integration.mqtt.support.MqttHeaders;
    
 import org.springframework.messaging.*;
    
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    
  
    
 /** * 实现了对 inboundtopic 中的主题监听,当有消息推送到 inboundtopic 主题上时可以接受
    
  * MQTT 消费端
    
  */
    
  
    
 @Configuration
    
 @IntegrationComponentScan
    
 public class MqttInBoundConfiguration {
    
  
    
     @Bean
    
     public MessageChannel mqttInputChannel() {
    
     return new DirectChannel();
    
     }
    
  
    
     /** * 自动注入
    
      */
    
     @Autowired
    
     private MqttConfiguration mqttProperties;
    
  
    
     /** * 创建MQTT客户端连接
    
      */
    
     @Bean
    
     public MqttPahoClientFactory mqttInClient(){
    
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    
     // 如果配置中有多个连接地址,就可以使用split来分割进行存储
    
     String[] mqttServerUrls = mqttProperties.getUrl().split(",");
    
     MqttConnectOptions options = new MqttConnectOptions();
    
     options.setServerURIs(mqttServerUrls);
    
  
    
     options.setUserName(mqttProperties.getUsername());
    
     options.setPassword(mqttProperties.getPassword().toCharArray());
    
     options.setKeepAliveInterval(2);
    
  
    
     //接受离线消息
    
     options.setCleanSession(false);
    
     factory.setConnectionOptions(options);
    
     return factory;
    
     }
    
  
    
     /** * 配置Client,监听Topic
    
      * 监听MQTT主题并将消息发送到指定通道
    
      * 如果要配置多个client,应该怎么处理呢?这个也简单, 模仿此方法,多写几个client
    
      */
    
     @Bean
    
     public MessageProducer inBound(){
    
     // 如果配置中有多个主题订阅,就可以使用split来分割进行存储
    
     String[] inBoundTopics = mqttProperties.getTopics().split(",");
    
     // 用于监听mqtt主题客户端,并接收到的消息传为Spring Integration消息,然后发送到指定的消息通道中,
    
     // 使用mqttInClient方法创建MQTT客户端,并指定要订阅的主题
    
     MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
    
             mqttProperties.getClientId()+"_inbound",
    
             mqttInClient(),
    
             inBoundTopics);
    
     // 适配器还设置了完成超时、QoS等级、消息转换器以及输出通道
    
     adapter.setCompletionTimeout(1000*5);
    
     adapter.setQos(0);
    
     adapter.setConverter(new DefaultPahoMessageConverter());
    
     // 当MQTT客户端接收到来自指定主题的消息时,适配器会将消息发送到mqttInputChannel()指定的通道中
    
     adapter.setOutputChannel(mqttInputChannel());
    
     return adapter;
    
     }
    
  
    
     /** * 通过通道获取数据,即处理MQTT发过来的消息,可以通过 MQTTX 工具发送数据测试
    
      */
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttInputChannel") // 异步处理
    
     public MessageHandler handler(){
    
     return new MessageHandler() {
    
         // 处理消息
    
         @Override
    
         public void handleMessage(Message<?> message) throws MessagingException {
    
             // 获取负载
    
             Object payload = message.getPayload();
    
             // 获取消息头
    
             MessageHeaders messageHeaders = message.getHeaders();
    
             // 获取消息id
    
             UUID packetId = messageHeaders.getId();
    
             // 获取qos等级
    
             Object qos = messageHeaders.get(MqttHeaders.RECEIVED_QOS);
    
             // 获取接收的主题
    
             Object topic = messageHeaders.get(MqttHeaders.RECEIVED_TOPIC);
    
             String handMessage = "MQTT Client " + " packetId:" + packetId
    
                     + "\nReceive payLoad:" + payload
    
                     + "\tQOS:" + qos
    
                     + "\nTopics:" + topic;
    
             System.out.println("返回的数据打印:"+handMessage);
    
       }
    
     }
    
  
    
 }
    
    
    
    

6.MqttOutBoundConfiguration 也是一个定义好的 Spring Boot 配置类,用于设置和管理 MQTT 客户端的发送功能,它定义了如何创建 MQTT 客户端连接、配置连接参数,并将消息发送到指定的 MQTT 主题,建立一个mqttOutBoundChannelIOT 的消息通道,用于发送消息到mqtt主题。

复制代码
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.context.annotation.Bean;
    
 import org.springframework.context.annotation.Configuration;
    
 import org.springframework.integration.annotation.ServiceActivator;
    
 import org.springframework.integration.channel.DirectChannel;
    
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    
 import org.springframework.messaging.MessageChannel;
    
 import org.springframework.messaging.MessageHandler;
    
  
    
 /** * MQTT 生产端
    
  * */
    
  
    
 @Configuration
    
 public class MqttOutBoundConfiguration {
    
  
    
     @Autowired
    
     MqttConfiguration mqttConfiguration;
    
  
    
     /** * 指定某一个主题发送信息
    
      * @return
    
      */
    
     @Bean
    
     public MessageChannel mqttOutBoundChannelIOT(){
    
     return new DirectChannel();
    
     }
    
  
    
     /** * 创建MQTT客户端连接,配置连接的参数选项
    
      */
    
     @Bean
    
     public MqttPahoClientFactory mqttOutClient(){
    
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    
     // 如果配置中有多个连接地址,就可以使用split来分割进行存储
    
     String[] mqttServerUrls = mqttConfiguration.getUrl().split(",");
    
     MqttConnectOptions options = new MqttConnectOptions();
    
     options.setServerURIs(mqttServerUrls);
    
     options.setUserName(mqttConfiguration.getUsername());
    
     options.setPassword(mqttConfiguration.getPassword().toCharArray());
    
     //接受离线消息,告诉代理客户端是否要建立持久会话 false为建立持久
    
     options.setCleanSession(false);
    
     factory.setConnectionOptions(options);
    
     return factory;
    
     }
    
  
    
     /** * 指定某一个通过发送mqtt信息
    
      * @return
    
      */
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttOutBoundChannelIOT")
    
     public MessageHandler mqttOutBoundIOT(){
    
     MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfiguration.getClientId()+"_outbound",
    
             mqttOutClient());
    
     // 如果没有传入主题就会给默认主题发送信息
    
     messageHandler.setDefaultTopic("iots");
    
     messageHandler.setAsync(true);
    
     return messageHandler;
    
     }
    
 }
    
    
    
    

7、我们在创建一个MqttGateWayController的控制层,用于给用户调用接口进行消息发送,在控制层中写了两种,第一种发送是可以虽然发送String,Int,json格式的数据,第二种是文件上传将文件中的数据读取出来在进行发送数据,在目前已知情况下mqtt是不应许直接发送文件的。

复制代码
 import com.robsense.droneinspectionplatfom.mqtt.MqttGateWay;

    
 import com.robsense.droneinspectionplatfom.utils.Result;
    
 import io.swagger.v3.oas.annotations.Operation;
    
 import io.swagger.v3.oas.annotations.Parameter;
    
 import io.swagger.v3.oas.annotations.Parameters;
    
 import io.swagger.v3.oas.annotations.tags.Tag;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.beans.factory.annotation.Qualifier;
    
 import org.springframework.stereotype.Controller;
    
 import org.springframework.web.bind.annotation.*;
    
 import org.springframework.web.multipart.MultipartFile;
    
  
    
 import java.io.IOException;
    
 import java.io.InputStream;
    
  
    
  
    
 @Controller
    
 @RequestMapping(value = "/mqtt")
    
 @Tag(name = "MqttController", description = "MQTT 访问控制")
    
 public class MqttGateWayController {
    
  
    
     // 用于精准依赖注入
    
     @Qualifier("mqttGateWay")
    
     @Autowired
    
     private MqttGateWay mqttGateWayService;
    
  
    
     /** * 向主题发送消息
    
      * @return
    
      */
    
     @PostMapping(value = "/sendMqtt")
    
     @Operation(summary = "向指定主题中,发送消息")
    
     @ResponseBody
    
     @Parameters({
    
         @Parameter(name = "topic",description = "主题",required = true),
    
         @Parameter(name = "data",description = "数据结构",required = true)
    
     })
    
     public Result SendMqtt(String topic, String data){
    
     try {
    
         mqttGateWayService.sendMessageToMqtt(data,topic);
    
         return Result.success(data);
    
     }catch (Exception e){
    
         e.printStackTrace();
    
     }
    
     return Result.error("操作失败",500);
    
     }
    
  
    
     /** * 向主题发送文件
    
      * @return
    
      */
    
     @PostMapping(value = "/sendFileMqtt")
    
     @Operation(summary = "向指定主题中,发送文件")
    
     @ResponseBody
    
     @Parameters({
    
         @Parameter(name = "topic",description = "主题",required = true),
    
         @Parameter(name = "file",description = "文件",required = true)
    
     })
    
     public Result SendFileMqtt(String topic, MultipartFile file){
    
     byte[] fileContent = new byte[(int) file.getSize()];
    
     try (InputStream inputStream = file.getInputStream()) {
    
         inputStream.read(fileContent);
    
         String fileName = file.getOriginalFilename();
    
         mqttGateWayService.sendFileToMqtt(fileContent, topic);
    
         return Result.success(fileName,"操作成功");
    
     } catch (IOException e) {
    
         // 处理异常
    
         e.printStackTrace();
    
     }
    
     return Result.error("操作失败",500);
    
     }
    
    
    
    

谢谢大家收看,如果对您有很大帮助那自然更好。

全部评论 (0)

还没有任何评论哟~