Advertisement

SpringBoot实现MQTT消息发送和接收

阅读量:

目录

1、maven依赖

2.yml配置

3.配置获取yml信息

4.发送

4.1 相关问题:

4.2 只发不接收可使用Netty-client 具体看Netty那章,等我空了写

5.接收信息

6.监听

7.总结


Spring integration交互逻辑
对于发布者:

消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。
对于订阅者:

通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

1、maven依赖

复制代码
  <dependency>

    
         <groupId>org.springframework.integration</groupId>
    
         <artifactId>spring-integration-mqtt</artifactId>
    
         <version>5.3.1.RELEASE</version>
    
     </dependency>

2.yml配置

复制代码
 mqtt:

    
     appid: mqttx_${random.value}
    
     #订阅主题,多个主题用逗号分隔
    
     inputTopic:  test009
    
     #MQTT服务器地址
    
     services: tcp://ip:1883
    
     #mqtt用户名
    
     user: 
    
     #mqtt密码,默认无
    
     password: 
    
     #心跳间隔时间,默认300
    
     KeepAliveInterval: 30
    
     #是否不保持session,默认false
    
     CleanSession: false
    
     #是否自动连接,默认true
    
     AutomaticReconnect: true
    
     #连接超时,默认30000
    
     CompletionTimeout: 30000
    
     #传输质量,默认1
    
     Qos: 1

3.配置获取yml信息

复制代码
复制代码
     @Value("${mqtt.appid}")

    
     private  String appid;
    
  
    
     @Value("${mqtt.inputTopic}")
    
     private  String[] inputTopic;//订阅主题
    
  
    
 //    @Value("${mqtt.outTopic}")
    
 //    private  String[] outTopic;//发布主题
    
  
    
     @Value("${mqtt.services}")
    
     private  String[] mqttServices;//服务器地址以及端口
    
  
    
     @Value("${mqtt.user}")
    
     private  String user;//用户名
    
  
    
     @Value("${mqtt.password}")
    
     private  String password;//密码
    
  
    
     @Value("${mqtt.KeepAliveInterval}")
    
     private  Integer KeepAliveInterval;//心跳时间
    
  
    
     @Value("${mqtt.CleanSession}")
    
     private  Boolean CleanSession;//是否不保持session,默认为session保持
    
  
    
     @Value("${mqtt.AutomaticReconnect}")
    
     private  Boolean AutomaticReconnect;//是否自动重联,默认为开启自动重联
    
  
    
     @Value("${mqtt.CompletionTimeout}")
    
     private  Long CompletionTimeout;//连接超时,默认为30秒
    
  
    
     @Value("${mqtt.Qos}")
    
     private  Integer Qos;//通信质量
    
  
    
  
    
     @Bean
    
     public MqttPahoClientFactory mqttClientFactory() {
    
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
    
     MqttConnectOptions options = new MqttConnectOptions();//连接参数
    
     options.setServerURIs(mqttServices);//连接地址
    
     if(user!=null) {
    
         options.setUserName(user);//用户名
    
     }
    
     if(password!=null) {
    
         options.setPassword(password.toCharArray());//密码
    
     }
    
     options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
    
     options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
    
     options.setCleanSession(CleanSession);//保持session
    
     factory.setConnectionOptions(options);
    
     return factory;
    
     }

4.发送

复制代码
 @Configuration

    
 @IntegrationComponentScan
    
 @Slf4j
    
 public class MqttOutboundConfiguration {
    
     @Autowired
    
     private MqttMessageService mqttMessageService;
    
  
    
     @Bean
    
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    
     public  MessageHandler mqttOutbound() {
    
     MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttMessageService.getAppid()+ "_producer", mqttMessageService.mqttClientFactory());
    
     // 如果设置成true,发送消息时将不会阻塞。
    
     messageHandler.setAsync(true);
    
     messageHandler.setDefaultTopic(mqttMessageService.getInputTopic()[0]);
    
     return messageHandler;
    
     }
    
  
    
     @Bean
    
     public MessageChannel mqttOutboundChannel() {
    
    // DirectChannel dc =new DirectChannel();
    
    // dc.subscribe(mqttOutbound());
    
     return new DirectChannel();
    
     }
    
 }

4.1 相关问题:

可以在发送时接收信息,代码如下:

它会接收到你发送的,以及接收的数据,但在input正常接收的频道它就会失效了

而且发送的消息在MQTT客户端没有显示出来,问题是啥不清楚,所以尽量避免

复制代码
 @Configuration

    
 @IntegrationComponentScan
    
 @Slf4j
    
 public class MqttOutboundConfiguration {
    
     @Autowired
    
     private MqttMessageService mqttMessageService;
    
  
    
     @Bean
    
     public  MessageHandler mqttOutbound() {
    
     MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttMessageService.getAppid()+ "_producer", mqttMessageService.mqttClientFactory());
    
     // 如果设置成true,发送消息时将不会阻塞。
    
     messageHandler.setAsync(true);
    
     messageHandler.setDefaultTopic(mqttMessageService.getInputTopic()[0]);
    
     return messageHandler;
    
     }
    
  
    
     @Bean
    
     public MessageChannel mqttOutboundChannel() {
    
    // DirectChannel dc =new DirectChannel();
    
    // dc.subscribe(mqttOutbound());
    
     return new DirectChannel();
    
     }
    
  
    
     /** * mqtt发布者信道名称 mqttOutboundChannel
    
      * @return
    
      */
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttOutboundChannel")
    
     public MessageHandler handler() {
    
     return new MessageHandler() {
    
         @Override
    
         public void handleMessage(Message<?> message) throws MessagingException {
    
             try {
    
                 MessageHeaders headers = message.getHeaders();
    
                 //获取消息Topic
    
                 String topic = (String) headers.get(MqttHeaders.TOPIC);
    
                 String receivedTopic = (String) headers.get(MqttHeaders.RECEIVED_TOPIC);
    
                 log.info("[获取到的消息的topic-out :]{} ", topic);
    
                 log.info("[获取到的消息的receivedTopic :]{} ", receivedTopic);
    
                 //获取消息体
    
                 String payload = (String) message.getPayload();
    
                 log.info("[获取到的消息的payload :]{} ", payload);
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         }
    
     };
    
     }
    
 }

4.2 只发不接收可使用Netty-client 具体看Netty那章,等我空了写

复制代码
  <dependency>

    
         <groupId>org.jetlinks</groupId>
    
         <artifactId>netty-mqtt-client</artifactId>
    
         <version>1.0.0</version>
    
     </dependency>
    
  
    
     <dependency>
    
         <groupId>io.netty</groupId>
    
         <artifactId>netty-all</artifactId>
    
         <version>4.1.43.Final</version>
    
     </dependency>

5.接收信息

复制代码
 @Configuration

    
 @IntegrationComponentScan
    
 @Slf4j
    
 public class MqttInboundConfiguration {
    
  
    
     @Autowired
    
     private MqttMessageService mqttMessageService;
    
     @Autowired
    
     private MqttReceiveHandle mqttReceiveHandle;
    
     @Bean
    
     public MessageProducerSupport mqttInbound() {
    
     MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttMessageService.getAppid()+ "_consumer",
    
                 mqttMessageService.mqttClientFactory(), mqttMessageService.getInputTopic());
    
     adapter.setCompletionTimeout(60000);
    
     adapter.setConverter(new DefaultPahoMessageConverter());
    
     adapter.setRecoveryInterval(10000);
    
     adapter.setQos(0);
    
     adapter.setOutputChannel(mqttInputChannel());
    
     return adapter;
    
     }
    
  
    
     @Bean
    
     public MessageChannel mqttInputChannel() {
    
     return new DirectChannel();
    
     }
    
  
    
     /** *  mqtt订阅者使用信道名称 mqttInboundChannel
    
      * @return
    
      */
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttInputChannel")
    
     public MessageHandler handler() {
    
     return new MessageHandler() {
    
         @Override
    
         public void handleMessage(Message<?> message) throws MessagingException {
    
             mqttReceiveHandle.handle(message);
    
         }
    
     };
    
     }
复制代码

所以input 和 out 最好放一起,不然发送接口就会发现Bean重复注入,导致失败;

贴个完整的代码

复制代码
 package com.example.trainserver.mqtt;

    
  
    
  
    
 import lombok.extern.slf4j.Slf4j;
    
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.beans.factory.annotation.Value;
    
 import org.springframework.context.annotation.Bean;
    
 import org.springframework.context.annotation.Configuration;
    
 import org.springframework.integration.annotation.IntegrationComponentScan;
    
 import org.springframework.integration.annotation.ServiceActivator;
    
 import org.springframework.integration.channel.DirectChannel;
    
 import org.springframework.integration.endpoint.MessageProducerSupport;
    
 import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    
 import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    
 import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
    
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    
 import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
    
 import org.springframework.messaging.Message;
    
 import org.springframework.messaging.MessageChannel;
    
 import org.springframework.messaging.MessageHandler;
    
 import org.springframework.messaging.MessagingException;
    
  
    
  
    
 @Configuration
    
 @IntegrationComponentScan
    
 @Slf4j
    
 public class MqttConfiguration {
    
     @Value("${mqtt.appid}")
    
     public String appid;
    
  
    
     @Value("${mqtt.inputTopic}")
    
     //订阅主题
    
     public String[] inputTopic;
    
  
    
 //    @Value("${mqtt.outTopic}")
    
 //    private  String[] outTopic;//发布主题
    
  
    
     @Value("${mqtt.services}")
    
     //服务器地址以及端口
    
     public String[] mqttServices;
    
  
    
     @Value("${mqtt.user}")
    
     //用户名
    
     public String user;
    
  
    
     @Value("${mqtt.password}")
    
     //密码
    
     public String password;
    
  
    
     @Value("${mqtt.KeepAliveInterval}")
    
     //心跳时间
    
     public Integer KeepAliveInterval;
    
  
    
     @Value("${mqtt.CleanSession}")
    
     //是否不保持session,默认为session保持
    
     public Boolean CleanSession;
    
  
    
     @Value("${mqtt.AutomaticReconnect}")
    
     //是否自动重联,默认为开启自动重联
    
     public Boolean AutomaticReconnect;
    
  
    
     @Value("${mqtt.CompletionTimeout}")
    
     //连接超时,默认为30秒
    
     public Long CompletionTimeout;
    
  
    
     @Value("${mqtt.Qos}")
    
     //通信质量
    
     public Integer Qos;
    
  
    
  
    
     @Bean
    
     public MqttPahoClientFactory mqttClientFactory() {
    
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
    
     MqttConnectOptions options = new MqttConnectOptions();//连接参数
    
     options.setServerURIs(mqttServices);//连接地址
    
     if (user != null) {
    
         options.setUserName(user);//用户名
    
     }
    
     if (password != null) {
    
         options.setPassword(password.toCharArray());//密码
    
     }
    
     options.setKeepAliveInterval(KeepAliveInterval);//心跳时间
    
     options.setAutomaticReconnect(AutomaticReconnect);//断开是否自动重联
    
     options.setCleanSession(CleanSession);//保持session
    
     factory.setConnectionOptions(options);
    
     return factory;
    
     }
    
  
    
  
    
     @Autowired
    
     private MqttReceiveHandle mqttReceiveHandle;
    
  
    
     @Bean
    
     public MessageProducerSupport mqttInbound() {
    
     MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid + "__consumer",
    
             mqttClientFactory(), inputTopic);
    
     adapter.setCompletionTimeout(60000);
    
     adapter.setConverter(new DefaultPahoMessageConverter());
    
     adapter.setRecoveryInterval(10000);
    
     adapter.setQos(0);
    
     adapter.setOutputChannel(mqttInputChannel());
    
     return adapter;
    
     }
    
  
    
     @Bean
    
     public MessageChannel mqttInputChannel() {
    
     return new DirectChannel();
    
     }
    
  
    
     /** * mqtt订阅者使用信道名称 mqttInboundChannel
    
      * * @return
    
      */
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttInputChannel")
    
     public MessageHandler handler() {
    
     return new MessageHandler() {
    
         @Override
    
         public void handleMessage(Message<?> message) throws MessagingException {
    
             mqttReceiveHandle.handle(message);
    
         }
    
     };
    
     }
    
  
    
     @Bean
    
     @ServiceActivator(inputChannel = "mqttOutboundChannel")
    
     public MessageHandler mqttOutbound() {
    
     MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(appid + "__producer", mqttClientFactory());
    
     // 如果设置成true,发送消息时将不会阻塞。
    
     messageHandler.setAsync(true);
    
     messageHandler.setDefaultTopic(inputTopic[0]);
    
     return messageHandler;
    
     }
    
  
    
     @Bean
    
     public MessageChannel mqttOutboundChannel() {
    
     // DirectChannel dc =new DirectChannel();
    
     // dc.subscribe(mqttOutbound());
    
     return new DirectChannel();
    
     }
    
  
    
     /** * mqtt发布者信道名称 mqttOutboundChannel
    
      * @return
    
      */
    
    /* @Bean
    
     @ServiceActivator(inputChannel = "mqttOutboundChannel")
    
     public MessageHandler outHandler() {
    
     return new MessageHandler() {
    
         @Override
    
         public void handleMessage(Message<?> message) throws MessagingException {
    
             System.out.println("AAAAA");
    
             mqttReceiveHandle.handle(message);
    
         }
    
     };
    
     }*/
    
 }

6.监听

简单地看了一下事件信息,他监听不到具体的message,所以pass

复制代码
  
    
 @Slf4j
    
 @Component
    
 public class MqttListener {
    
     /** * 连接失败的事件通知
    
      * @param mqttConnectionFailedEvent
    
      */
    
     @EventListener(classes = MqttConnectionFailedEvent.class)
    
     public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
    
     log.info("连接失败的事件通知");
    
     }
    
  
    
     /** * 已发送的事件通知
    
      * @param mqttMessageSentEvent
    
      */
    
     @EventListener(classes = MqttMessageSentEvent.class)
    
     public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
    
     log.info("已发送的事件通知");
    
     }
    
  
    
     /** * 已传输完成的事件通知
    
      * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
    
      * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
    
      * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
    
      * @param mqttMessageDeliveredEvent
    
      */
    
     @EventListener(classes = MqttMessageDeliveredEvent.class)
    
     public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
    
     log.info("已传输完成的事件通知");
    
     }
    
  
    
     /** * 消息订阅的事件通知
    
      * @param mqttSubscribedEvent
    
      */
    
     @EventListener(classes = MqttSubscribedEvent.class)
    
     public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
    
     log.info("消息订阅的事件通知{}",mqttSubscribedEvent.toString());
    
     }
    
 }

7.总结

MQTT接收就是接收,没有所谓的消费处理逻辑,所以只要能收到,就能处理

ActiveMQ,是用监听来处理信息的,处理完有一个数据消费的处理步骤

复制代码
 @Service

    
 @Slf4j
    
 public class ERPListener implements SessionAwareMessageListener<Message> {
    
   @Override
    
     @JmsListener(destination = ImsQueueName.ERP_SEND_QUEUE, containerFactory = "queueContainerFactory")
    
     public void onMessage(javax.jms.Message message, Session session) throws JMSException {
    
     if (message == null) {
    
         log.error("接收消息为空。。。。。。。。。。。");
    
         return;
    
     }
    
     try {
    
         log.info("ERP数据推送消息==============={}", System.currentTimeMillis());
    
         TextMessage textMessage = (TextMessage) message;
    
         String text = textMessage.getText();
    
         MqMsgLog mqMsgLog = JSONObject.parseObject(text, MqMsgLog.class);
    
         String body = mqMsgLog.getBody();
    
      
    
         log.error("===ERP数据推送消息 队列消费失败===:{}", e.getMessage());
    
         session.recover();
    
     }
    
     }
    
  
    
 }
复制代码
 ActiveMQQueue queueDestination = new ActiveMQQueue(ImsQueueName.ERP_SEND_QUEUE);

    
         mqProducerService.delaySendMessage(queueDestination, citicSplitRecord.getCiticSplitRecordId().toString(),300);

全部评论 (0)

还没有任何评论哟~