使用SpringIntegration连接MQTT
随着物联网的逐步发展,MQTT以起简单,轻量,高效成为物联网应用开发的首选技术栈,今天我们来介绍一下如何在SpringBoot中使用SpringIntegration来连接MQTT。
准备工作
我们需要先准备好以下基础开发环境:
- JDK 1.8+
- Maven
- MQTT Broker
- 熟悉的IDE
接下来需要使用Spring Initializr来创建一个项目,引入SpringWeb相关依赖,这些操作如果不清楚,可以在网络上用搜索引擎寻求帮助。
这样我们的准备工作就做完了。
添加SpringIntegration相关依赖
在SpringBoot项目POM文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
添加MQTT相关配置信息
接下来我们在SpringBoot的配置文件中添加一些必须得配置。这里假定我们使用的是properties文件。
mqtt.username=用户名
mqtt.password=密码
mqtt.urls=连接地址,要加上协议名称和端口,多个使用逗号分割
mqtt.client-id-subscribe=订阅端id,需唯一
mqtt.client-id-publish=发布端id,需唯一
mqtt.default-subscribe-topic=默认订阅的主题,多个使用逗号分割
mqtt.default-publish-topic=默认发布的主题
配置MQTT
新建MqttConfig.java
文件,编写以下内容:
package cn.ihexin.mqtt.config;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.*;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author hexin
* @version 1.0
* @date 2023/4/20 13:31
*/
@Configuration
@Slf4j
public class MqttConfig {
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";
public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.urls}")
private String urls;
@Value("${mqtt.default-subscribe-topic}")
private String defaultSubscribeTopic;
@Value("${mqtt.default-publish-topic}")
private String defaultPublishTopic;
@Value("${mqtt.client-id-subscribe}")
private String clientIdSub;
@Value("${mqtt.client-id-publish}")
private String clientIdPub;
/**
* 连接mqtt配置
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// false,服务器会保留客户端的连接记录 true,表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setServerURIs(StrUtil.splitToArray(urls, ","));
// 超时时间 单位为秒
options.setConnectionTimeout(10);
// 会话心跳时间 单位: s, 间隔时间:1.5*20秒向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 设置是否自动重连
options.setAutomaticReconnect(true);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}
/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
/**
* 发送者消息通道
*/
@Bean(name = MQTT_OUTBOUND_CHANNEL)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 发送者消息处理
*/
@Bean
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPub, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setDefaultTopic(defaultPublishTopic);
return messageHandler;
}
/**
* 消息订阅
*/
@Bean
public MessageProducer inbound() {
// 可同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdSub, mqttClientFactory(), StrUtil.splitToArray(defaultSubscribeTopic, ","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* 消费者消息通道
*/
@Bean(name = MQTT_INBOUND_CHANNEL)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
/**
* 消费者消息处理
*/
@Bean
@ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
public MessageHandler mqttInbound() {
return (message -> {
log.info("[MQTT]-消息接收 - [{}] - [{}]", message.getPayload(), message.getHeaders());
// TODO 处理消息
});
}
/**
* 监听事件
*/
@EventListener
public void handleEvent(MqttIntegrationEvent event) {
if (event instanceof MqttConnectionFailedEvent) {
// 连接失败
MqttConnectionFailedEvent connectionFailedEvent = (MqttConnectionFailedEvent) event;
log.error("[MQTT]-连接失败 - [{}] - [{}]", event.getClass().getSimpleName(), connectionFailedEvent.getCause().getMessage());
return;
}
if (event instanceof MqttMessageSentEvent) {
MqttMessageSentEvent sentEvent = (MqttMessageSentEvent) event;
log.info("[MQTT]-消息发出 - [{}] - [{}] - [{}]", event.getClass().getSimpleName(), sentEvent.getMessageId(), sentEvent.getMessage());
}
if (event instanceof MqttMessageDeliveredEvent) {
MqttMessageDeliveredEvent deliveredEvent = (MqttMessageDeliveredEvent) event;
log.info("[MQTT]-消息送达 - [{}] - [{}]", event.getClass().getSimpleName(), deliveredEvent.getMessageId());
return;
}
if (event instanceof MqttSubscribedEvent) {
MqttSubscribedEvent subscribedEvent = (MqttSubscribedEvent) event;
log.info("[MQTT]-消息订阅 [{}] - [{}]", event.getClass().getSimpleName(), subscribedEvent.getMessage());
return;
}
log.info("[MQTT]-其他事件 - [{}] - [{}] - [{}]", event.getClass().getSimpleName(), event.getSource().toString(), event.getCause().getMessage());
}
}
这个类中我们配置了MQTT的连接,MQTT的通道,以及MQTT的消息接收,事件监听。
我们对消息接收后的处理逻辑应在mqttInbound
方法中实现。事件监听可以帮助我们获取相关MQTT事件信息,做出相应的处理。
MQTT消息发送
新建MqttProducer
接口,编写如下代码:
package cn.ihexin.mqtt.producer;
import cn.ihexin.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @author hexin
* @version 1.0
* @date 2023/4/20 14:21
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.MQTT_OUTBOUND_CHANNEL)
public interface MqttProducer {
/**
* 发送信息
*/
void send(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
我们在需要使用的业务类中注入该接口,即可实现消息发布。
以上便是SpringBoot使用SpringIntegration接入MQTT的通用完整流程,如有其他特殊需求,请参阅官方文档。