基于SpringBoot实现MQTT消息收发
发布时间
阅读量:
阅读量
基于SpringBoot实现MQTT消息收发
- 实验环境
SpringBoot 2.2.2.RELEASE 包含核心组件
EMQX 社区的最新版本提供了 MQTT 服务端的支持
Docker 18.0.~ 支持容器化部署
- POM引入依赖包
#pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.2.3.RELEASE</version>
</dependency>
- Mqtt配置代码及YAML配置
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfiguration {
private int keepAliveInterval;
private int connectionTimeout;
private String userName;
private String userPassword;
private List<String> uris;
private List<String> topics;
private int qos;
public MqttConnectOptions connectionOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(getUris().toArray(new String[0]));
options.setPassword(getUserPassword().toCharArray());
options.setKeepAliveInterval(getKeepAliveInterval());
options.setCleanSession(false);
options.setUserName(getUserName());
options.setAutomaticReconnect(true);
options.setConnectionTimeout(getConnectionTimeout());
return options;
}
}
# application.yaml
mqtt:
keepAliveInterval: 10
connectionTimeout: 10
userName: root
userPassword: root
uris:
- tcp://10.0.1.12:1883
qos: 1
topics:
- test
- 订阅者代码
@Log4j2
@Data
@Configuration
@IntegrationComponentScan
public class Subscriber {
@Resource
MqttConfiguration configuration;
@Bean
public MqttConnectOptions connectOptions() {
return configuration.connectionOptions();
}
// 初始化连接工厂
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(connectOptions());
return factory;
}
// 建立输入通道
@Bean("mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel() {{
this.subscribe(handler());
}};
}
private static String clusterClientId() {
try {
return "subscriber" + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException he) {
log.warn("unknown ip address, cause: {}", he.getMessage());
}
return "subscriber" + "-" + System.currentTimeMillis();
}
// 绑定TOPICs
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clusterClientId(),
mqttPahoClientFactory(),
configuration.getTopics().toArray(new String[0]));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(configuration.getQos());
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
// 消息处理定义
@Bean
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("HandleMessage headers: {}", message.getHeaders());
log.info("HandleMessage payload: {}", message.getPayload());
}
};
}
- 发布者代码
@Log4j2
@Data
@Configuration
@IntegrationComponentScan
public class Publisher {
public static final String MQTT_OUT_BOUND_CHANNEL_BEAN_NAME = "mqttOutboundChannel";
@Resource
MqttConfiguration configuration;
@Bean
public MqttConnectOptions connectOptions() {
return configuration.connectionOptions();
}
// 初始化连接工厂
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(connectOptions());
return factory;
}
// 建立输出通道
@Bean(MQTT_OUT_BOUND_CHANNEL_BEAN_NAME)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel() {{
this.subscribe(handler());
}};
}
private static String clusterClientId() {
try {
return "publisher" + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException he) {
log.warn("unknown ip address, cause: {}", he.getMessage());
}
return "publisher" + "-" + System.currentTimeMillis();
}
// 消息处理定义
@Bean
public MessageHandler handler() {
MqttPahoMessageHandler messageHandler
= new MqttPahoMessageHandler(clusterClientId(), mqttPahoClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(configuration.getQos());
messageHandler.setConverter(new DefaultDatatypeChannelMessageConverter());
return messageHandler;
}
}
- 发布消息接口
@MessagingGateway(defaultRequestChannel = Publisher.MQTT_OUT_BOUND_CHANNEL_BEAN_NAME)
public interface MqttGateway {
void sendToMqtt(String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
- SpringBoot 启动配置
// @EnableIntegration
@EnableIntegration
@SpringBootApplication
public class BootstrapApplication {
public static void main(String[] args) {
SpringApplication.run(BootstrapApplication.class, args);
}
}
- Docker部署Mqtt(EMQX)实例
$ docker pull emqx:latest
$ docker run --name emq -p 18083:18083 -p 1883:1883 -p 8084:8084 -p 8883:8883 -p 8083:8083 -d emqx:latest
# 18083: 服务器启动端口
# 1883 : TCP
# 8084 : WSS
# 8883 : SSL
# 8083 : WS
- 访问EMQX控制台
√ Visit http://localhost:18083
admin / public
- 编写测试Controller
@RestController
@RequestMapping("")
public class PublishController {
@Resource
MqttGateway mqttGateway;
@GetMapping("/t/{message}")
public String publish(@PathVariable("message") String message) {
mqttGateway.sendToMqtt("test", message);
return "OK";
}
}
- 测试及结果
Access granted at http://localhost:8080/t/goodjob. Good job!
% Log received – Capture message data.
That’s all!
FAQs
版本兼容性问题: java.lang.NoSuchFieldError:logger
提供支持的工具包 spring-integration-mqtt 和 spring-integration-stream 可以帮助您实现多种功能。
访问 mvnrepository 网站获取与当前使用的 Spring Boot 版本相匹配的工具包。
建议参考以下方法获取所需工具包:
- 查看当前使用的 Spring Boot 版本;
- 寻找发布时间较近的相关版本;
- 按需选择适合项目的组件。
全部评论 (0)
还没有任何评论哟~
