何鑫个人博客

使用SpringIntegration连接MQTT

  • 2023-04-21 14:07:59
  • 技术
  • 1618

随着物联网的逐步发展,MQTT以起简单,轻量,高效成为物联网应用开发的首选技术栈,今天我们来介绍一下如何在SpringBoot中使用SpringIntegration来连接MQTT。

准备工作

我们需要先准备好以下基础开发环境:

  1. JDK 1.8+
  2. Maven
  3. MQTT Broker
  4. 熟悉的IDE

接下来需要使用Spring Initializr来创建一个项目,引入SpringWeb相关依赖,这些操作如果不清楚,可以在网络上用搜索引擎寻求帮助。

这样我们的准备工作就做完了。

添加SpringIntegration相关依赖

在SpringBoot项目POM文件中添加以下依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>

添加MQTT相关配置信息

接下来我们在SpringBoot的配置文件中添加一些必须得配置。这里假定我们使用的是properties文件。

mqtt.username=用户名
mqtt.password=密码
mqtt.urls=连接地址,要加上协议名称和端口,多个使用逗号分割
mqtt.client-id-subscribe=订阅端id,需唯一
mqtt.client-id-publish=发布端id,需唯一
mqtt.default-subscribe-topic=默认订阅的主题,多个使用逗号分割
mqtt.default-publish-topic=默认发布的主题

配置MQTT

新建MqttConfig.java文件,编写以下内容:

package cn.ihexin.mqtt.config;

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.*;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * @author hexin
 * @version 1.0
 * @date 2023/4/20 13:31
 */
@Configuration
@Slf4j
public class MqttConfig {

    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }

    public static final String MQTT_INBOUND_CHANNEL = "mqttInboundChannel";


    public static final String MQTT_OUTBOUND_CHANNEL = "mqttOutboundChannel";

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.urls}")
    private String urls;

    @Value("${mqtt.default-subscribe-topic}")
    private String defaultSubscribeTopic;

    @Value("${mqtt.default-publish-topic}")
    private String defaultPublishTopic;

    @Value("${mqtt.client-id-subscribe}")
    private String clientIdSub;

    @Value("${mqtt.client-id-publish}")
    private String clientIdPub;

    /**
     * 连接mqtt配置
     */
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // false,服务器会保留客户端的连接记录 true,表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setServerURIs(StrUtil.splitToArray(urls, ","));
        // 超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 会话心跳时间 单位: s, 间隔时间:1.5*20秒向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        // 设置是否自动重连
        options.setAutomaticReconnect(true);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }


    /**
     * MQTT客户端
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions());
        return factory;
    }

    /**
     * 发送者消息通道
     */
    @Bean(name = MQTT_OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 发送者消息处理
     */
    @Bean
    @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdPub, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setAsyncEvents(true);
        messageHandler.setDefaultTopic(defaultPublishTopic);
        return messageHandler;
    }

    /**
     * 消息订阅
     */
    @Bean
    public MessageProducer inbound() {
        // 可同时消费(订阅)多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientIdSub, mqttClientFactory(), StrUtil.splitToArray(defaultSubscribeTopic, ","));
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        // 设置订阅通道
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    /**
     * 消费者消息通道
     */
    @Bean(name = MQTT_INBOUND_CHANNEL)
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    /**
     * 消费者消息处理
     */
    @Bean
    @ServiceActivator(inputChannel = MQTT_INBOUND_CHANNEL)
    public MessageHandler mqttInbound() {
        return (message -> {
            log.info("[MQTT]-消息接收 - [{}] - [{}]", message.getPayload(), message.getHeaders());
            // TODO 处理消息
        });
    }

    /**
     * 监听事件
     */
    @EventListener
    public void handleEvent(MqttIntegrationEvent event) {
        if (event instanceof MqttConnectionFailedEvent) {
            // 连接失败
            MqttConnectionFailedEvent connectionFailedEvent = (MqttConnectionFailedEvent) event;
            log.error("[MQTT]-连接失败 - [{}] - [{}]", event.getClass().getSimpleName(), connectionFailedEvent.getCause().getMessage());
            return;
        }
        if (event instanceof MqttMessageSentEvent) {
            MqttMessageSentEvent sentEvent = (MqttMessageSentEvent) event;
            log.info("[MQTT]-消息发出 - [{}] - [{}] - [{}]", event.getClass().getSimpleName(), sentEvent.getMessageId(), sentEvent.getMessage());
        }
        if (event instanceof MqttMessageDeliveredEvent) {
            MqttMessageDeliveredEvent deliveredEvent = (MqttMessageDeliveredEvent) event;
            log.info("[MQTT]-消息送达 - [{}] - [{}]", event.getClass().getSimpleName(), deliveredEvent.getMessageId());
            return;
        }
        if (event instanceof MqttSubscribedEvent) {
            MqttSubscribedEvent subscribedEvent = (MqttSubscribedEvent) event;
            log.info("[MQTT]-消息订阅 [{}] - [{}]", event.getClass().getSimpleName(), subscribedEvent.getMessage());
            return;
        }
        log.info("[MQTT]-其他事件 - [{}] - [{}] - [{}]", event.getClass().getSimpleName(), event.getSource().toString(), event.getCause().getMessage());
    }

}

这个类中我们配置了MQTT的连接,MQTT的通道,以及MQTT的消息接收,事件监听。

我们对消息接收后的处理逻辑应在mqttInbound方法中实现。事件监听可以帮助我们获取相关MQTT事件信息,做出相应的处理。

MQTT消息发送

新建MqttProducer接口,编写如下代码:

package cn.ihexin.mqtt.producer;

import cn.ihexin.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @author hexin
 * @version 1.0
 * @date 2023/4/20 14:21
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.MQTT_OUTBOUND_CHANNEL)
public interface MqttProducer {

    /**
     * 发送信息
     */
    void send(@Header(MqttHeaders.TOPIC) String topic,
              @Header(MqttHeaders.QOS) int qos,
              String payload);

}

我们在需要使用的业务类中注入该接口,即可实现消息发布。

以上便是SpringBoot使用SpringIntegration接入MQTT的通用完整流程,如有其他特殊需求,请参阅官方文档。