【SpringBoot集成MQTT实现消息推送】
发布时间
阅读量:
阅读量
SpringBoot集成MQTT实现消息推送
依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
AI写代码
application.yml:
mqtt:
username: admin
password: admin
url: tcp://127.0.0.1:1883
client-id: clientid-dev
completionTimeout: 3000
AI写代码
MqttSenderConfig
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/** * 〈一句话功能简述〉<br>
* * 〈MQTT发送消息配置〉
* * @author JIANGMING
* @since 2022/2/19 0019 16:29
**/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String hostUrl;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
String clientId = "h-backend-mqtt-out-" + System.currentTimeMillis();
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(2);
messageHandler.setDefaultTopic("/default/mqtt/topic/foo");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
AI写代码
MqttGateway
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
AI写代码
测试
@Resource
private MqttGateway mqttMessageSender;
mqttMessageSender.sendToMqtt("/sys/ping/"+farmManageId,StringUtils.toJson(map));
AI写代码
全部评论 (0)
还没有任何评论哟~
