Advertisement

基于SpringBoot实现MQTT消息收发

阅读量:

基于SpringBoot实现MQTT消息收发

  • 实验环境

SpringBoot 2.2.2.RELEASE 包含核心组件
EMQX 社区的最新版本提供了 MQTT 服务端的支持
Docker 18.0.~ 支持容器化部署

  • POM引入依赖包
复制代码
    #pom.xml
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
      <version>2.2.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>5.2.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-stream</artifactId>
      <version>5.2.3.RELEASE</version>
    </dependency>
  • Mqtt配置代码及YAML配置
复制代码
    @Data
    @Component
    @ConfigurationProperties(prefix = "mqtt")
    public class MqttConfiguration {
    private int keepAliveInterval;
    private int connectionTimeout;
    private String userName;
    private String userPassword;
    private List<String> uris;
    private List<String> topics;
    private int qos;
    
    
    public MqttConnectOptions connectionOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(getUris().toArray(new String[0]));
        options.setPassword(getUserPassword().toCharArray());
        options.setKeepAliveInterval(getKeepAliveInterval());
        options.setCleanSession(false);
        options.setUserName(getUserName());
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(getConnectionTimeout());
    
        return options;
    }
    }
复制代码
    # application.yaml
    mqtt:
      keepAliveInterval: 10
      connectionTimeout: 10
      userName: root
      userPassword: root
      uris:
    - tcp://10.0.1.12:1883
      qos: 1
      topics: 
    - test
  • 订阅者代码
复制代码
    @Log4j2
    @Data
    @Configuration
    @IntegrationComponentScan
    public class Subscriber {
    
    @Resource
    MqttConfiguration configuration;
    
    @Bean
    public MqttConnectOptions connectOptions() {
        return configuration.connectionOptions();
    }
    
    // 初始化连接工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(connectOptions());
        return factory;
    }
    
    // 建立输入通道
    @Bean("mqttInboundChannel")
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel() {{
            this.subscribe(handler());
        }};
    }
    
    private static String clusterClientId() {
        try {
            return "subscriber" + "-" + InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException he) {
            log.warn("unknown ip address, cause: {}", he.getMessage());
        }
        return "subscriber" + "-" + System.currentTimeMillis();
    }
    
    // 绑定TOPICs
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        clusterClientId(),
                        mqttPahoClientFactory(),
                        configuration.getTopics().toArray(new String[0]));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(configuration.getQos());
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }
    
    // 消息处理定义
    @Bean
    public MessageHandler handler() {
        return new MessageHandler() {
    
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                log.info("HandleMessage headers: {}", message.getHeaders());
                log.info("HandleMessage payload: {}", message.getPayload());
            }
        };
    }
  • 发布者代码
复制代码
    @Log4j2
    @Data
    @Configuration
    @IntegrationComponentScan
    public class Publisher {
    
    public static final String MQTT_OUT_BOUND_CHANNEL_BEAN_NAME = "mqttOutboundChannel";
    
    @Resource
    MqttConfiguration configuration;
    
    @Bean
    public MqttConnectOptions connectOptions() {
        return configuration.connectionOptions();
    }
    
    // 初始化连接工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(connectOptions());
        return factory;
    }
    
    // 建立输出通道
    @Bean(MQTT_OUT_BOUND_CHANNEL_BEAN_NAME)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel() {{
            this.subscribe(handler());
        }};
    }
    
    private static String clusterClientId() {
        try {
            return "publisher" + "-" + InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException he) {
            log.warn("unknown ip address, cause: {}", he.getMessage());
        }
        return "publisher" + "-" + System.currentTimeMillis();
    }
    
    // 消息处理定义
    @Bean
    public MessageHandler handler() {
        MqttPahoMessageHandler messageHandler
                = new MqttPahoMessageHandler(clusterClientId(), mqttPahoClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(configuration.getQos());
        messageHandler.setConverter(new DefaultDatatypeChannelMessageConverter());
        return messageHandler;
    }
    
    }
  • 发布消息接口
复制代码
    @MessagingGateway(defaultRequestChannel = Publisher.MQTT_OUT_BOUND_CHANNEL_BEAN_NAME)
    public interface MqttGateway {
    
    void sendToMqtt(String payload);
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }
  • SpringBoot 启动配置
复制代码
    // @EnableIntegration
    @EnableIntegration
    @SpringBootApplication
    public class BootstrapApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(BootstrapApplication.class, args);
    }
    
    }
  • Docker部署Mqtt(EMQX)实例
复制代码
    $ docker pull emqx:latest
    $ docker run --name emq -p 18083:18083 -p 1883:1883 -p 8084:8084 -p 8883:8883 -p 8083:8083 -d emqx:latest
    
    # 18083: 服务器启动端口
    # 1883 : TCP
    # 8084 : WSS
    # 8883 : SSL
    # 8083 : WS
  • 访问EMQX控制台
复制代码
    √ Visit http://localhost:18083
    
    admin / public
  • 编写测试Controller
复制代码
    @RestController
    @RequestMapping("")
    public class PublishController {
    
    @Resource
    MqttGateway mqttGateway;
    
    @GetMapping("/t/{message}")
    public String publish(@PathVariable("message") String message) {
        mqttGateway.sendToMqtt("test", message);
        return "OK";
    
    }
    
    }
  • 测试及结果

Access granted at http://localhost:8080/t/goodjob. Good job!

% Log received – Capture message data.

That’s all!

FAQs

版本兼容性问题: java.lang.NoSuchFieldError:logger

提供支持的工具包 spring-integration-mqtt 和 spring-integration-stream 可以帮助您实现多种功能。
访问 mvnrepository 网站获取与当前使用的 Spring Boot 版本相匹配的工具包。
建议参考以下方法获取所需工具包:

  1. 查看当前使用的 Spring Boot 版本;
  2. 寻找发布时间较近的相关版本;
  3. 按需选择适合项目的组件。

全部评论 (0)

还没有任何评论哟~