Advertisement

SpringBoot集成Mqtt发送消息

阅读量:

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测 ,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

在这里插入图片描述
  • 客户端(Client):MQTT 客户端是发送接收 消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理 消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

MQTT 消息

MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

  • 主题,可以理解为消息的类型
  • 负载,可以理解为消息的内容

消息服务质量QoS(Quality of Service)

Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

  • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
  • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
  • 2 消息仅传送一次,确保消息到达一次

2. SpringBoot集成Mqtt

Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3

复制代码
    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.0</version>
    </dependency>
    
    
    xml

服务端配置

复制代码
    public class MqttSendMsgService {
    private static String clientId = "test";
    private static String username = "admin";
    private static String password = "xxxxxx";
    private static String broker = "tcp://xxxxx:1883";
    public ReturnT<String> mqttSend(String param) {
        MqttClient client;
        try {
            client = new MqttClient(broker, clientId, new MemoryPersistence());
            client.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("Message arrived: " + mqttMessage.getPayload());
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("Delivery complete");
                }
            });
    
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());
    
            client.connect(connOpts);
            log.info("Connected to MQTT Broker!");
    
    
            //主题
            String topic="test/simple";
            //消息
            String content="发送测试";
    
            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setRetained(false);
            message.setPayload(content.getBytes());
            //消息发送
            client.publish(topic,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return ReturnT.SUCCESS;
    }
    }
    
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/tFQ6gzPJ0KTMHjoLeEvlb5dRV3ky.png)

上面这种使用起来比较简单,生产环境使用最多的还是下面这种

第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入

复制代码
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    	<version>5.5.14</version>
    </dependency>
    
    
    xml

添加yaml配置

复制代码
    mqtt.url = tcp://xxxxx:1883
    mqtt.username = admin
    mqtt.password = 123456
    mqtt.clientId = test
    mqtt.defaultTopic = /test/send
    mqtt.keepAliveInterval = 60
    mqtt.automaticReconnect = true
    mqtt.cleanSession = false
    mqtt.connectionTimeout = 30
    mqtt.maxInflight = 1024
    
    
    yaml
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/LqoSnRa9X5KTiWw6MOVedC8gPl10.png)

添加对应的属性配置类

复制代码
    @Component
    public class MqttConfigProperties {
    @Value("${mqtt.url}")
    private String url;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;
    @Value("${mqtt.keepAliveInterval}")
    private Integer keepAliveInterval;
    @Value("${mqtt.automaticReconnect}")
    private Boolean automaticReconnect;
    @Value("${mqtt.cleanSession}")
    private Boolean cleanSession;
    @Value("${mqtt.connectionTimeout}")
    private Integer connectionTimeout;
    @Value("${mqtt.maxInflight}")
    private Integer maxInflight;
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/SKvhyH1rPx2zj6IYkRGJM3ELCTuO.png)

创建客户端配置类

复制代码
    @Configuration
    @IntegrationComponentScan
    public class MqttConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;
    
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigProperties.getUsername());
        options.setPassword(mqttConfigProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
        options.setCleanSession(mqttConfigProperties.getCleanSession());
        options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
        options.setMaxInflight(mqttConfigProperties.getMaxInflight());
        return options;
    }
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }
    
    // 推送通道
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
        log.info("初始化mqttOutputChannel...");
        return messageHandler;
    }
    
    
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/L89Y2PinGH4WISDwsQOa5ctAjdly.png)

发送网关接口

复制代码
    @MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
    public interface MqttGateway {
    /** * 发送消息
     * * @param topic
     * @param data
     */
    void send(@Header(MqttHeaders.TOPIC) String topic, String data);
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-16/BcbVmazgtRi3XovPpyFxdD9rs6Hw.png)

这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

复制代码
    mqttGateway.send(topic, JSONObject.toJSONString(msg));
    
    
    java
    
    

参考:
https://mqtt.org/

全部评论 (0)

还没有任何评论哟~