MQTT协议

June 6, 20252 minutes

MQTT协议

前言

最近在对接第三方供应商设备时接触到了MQTT协议,在物联网(IoT)领域 MQTT(Message Queuing Telemetry Transport)作为一种轻量级的消息传输协议,已经成为IoT领域的标准选择。所以准备梳理下MQTT协议的核心概念。

什么是MQTT协议

基本概念

MQTT是一种基于发布/订阅(Publish/Subscribe)模式的轻量级消息传输协议。它最初由IBM于1999年开发,专门为低带宽、高延迟或不稳定的网络环境设计。2019年,MQTT 5.0成为OASIS标准。

MQTT的核心特点

轻量级设计:MQTT协议的设计极其简洁,最小的消息头只有2个字节,这使得它非常适合带宽受限的环境。

发布/订阅模式:采用异步消息传递模式,发布者和订阅者之间解耦,通过中介代理(Broker)进行消息路由。

三种QoS级别

  • QoS 0:最多一次传递(At most once)
  • QoS 1:至少一次传递(At least once)
  • QoS 2:恰好一次传递(Exactly once)

会话保持:支持持久会话,即使客户端断开连接,代理也能保存未传递的消息。

遗嘱消息:客户端可以设置遗嘱消息,当异常断开时由代理发送给相关订阅者。

MQTT工作原理

核心组件

MQTT代理(Broker):消息中介服务器,负责接收发布者的消息并转发给相应的订阅者。常见的代理有Mosquitto、EMQ X、HiveMQ等。

发布者(Publisher):发送消息到特定主题的客户端。

订阅者(Subscriber):订阅特定主题并接收消息的客户端。

主题(Topic):消息的分类标识,采用层次结构,如sensor/temperature/room1

消息流程

  1. 客户端连接到MQTT代理
  2. 订阅者向代理订阅感兴趣的主题
  3. 发布者向代理发布消息到特定主题
  4. 代理将消息转发给所有订阅该主题的客户端
  5. 客户端处理接收到的消息

主题通配符

MQTT支持两种通配符:

  • +:单级通配符,匹配一个主题级别
  • #:多级通配符,匹配多个主题级别

例如:sensor/+/temperature可以匹配sensor/room1/temperaturesensor/room2/temperature

.NET中使用MQTT协议

MQTT库

在.NET生态系统中,最受欢迎的MQTT客户端库是MQTTnet。它是一个开源、高性能的.NET MQTT库,支持客户端和服务器功能。

安装MQTTnet

dotnet add package MQTTnet
dotnet add package MQTTnet.Extensions.ManagedClient

MQTT客户端实现

基础客户端连接

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

public class MqttClientService
{
    private IMqttClient _mqttClient;

    public async Task ConnectAsync()
    {
        var factory = new MqttFactory();
        _mqttClient = factory.CreateMqttClient();

        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost", 1883)
            .WithClientId("DotNetClient")
            .WithCredentials("username", "password")
            .WithCleanSession()
            .Build();

        _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;
        _mqttClient.ConnectedAsync += OnConnected;
        _mqttClient.DisconnectedAsync += OnDisconnected;

        await _mqttClient.ConnectAsync(options);
    }

    private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
    {
        var topic = e.ApplicationMessage.Topic;
        var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

        Console.WriteLine($"接收到消息 - 主题: {topic}, 内容: {payload}");

        // 处理消息逻辑
        await ProcessMessage(topic, payload);
    }

    private async Task OnConnected(MqttClientConnectedEventArgs e)
    {
        Console.WriteLine("MQTT客户端已连接");

        // 连接成功后订阅主题
        await SubscribeToTopics();
    }

    private Task OnDisconnected(MqttClientDisconnectedEventArgs e)
    {
        Console.WriteLine($"MQTT客户端断开连接: {e.Reason}");
        return Task.CompletedTask;
    }
}

订阅主题

public async Task SubscribeToTopics()
{
    var topicFilters = new List<MqttTopicFilter>
    {
        new MqttTopicFilterBuilder()
            .WithTopic("sensor/+/temperature")
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .Build(),
        new MqttTopicFilterBuilder()
            .WithTopic("device/status/#")
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
            .Build()
    };

    await _mqttClient.SubscribeAsync(topicFilters.ToArray());
    Console.WriteLine("已订阅主题");
}

发布消息

public async Task PublishAsync(string topic, object payload)
{
    var json = JsonSerializer.Serialize(payload);

    var message = new MqttApplicationMessageBuilder()
        .WithTopic(topic)
        .WithPayload(json)
        .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
        .WithRetainFlag() // 保留消息
        .Build();

    await _mqttClient.PublishAsync(message);
    Console.WriteLine($"消息已发布到主题: {topic}");
}

// 使用示例
var sensorData = new
{
    Temperature = 25.5,
    Humidity = 60.2,
    Timestamp = DateTime.UtcNow
};

await PublishAsync("sensor/room1/data", sensorData);

实际应用场景

设备监控系统

public class DeviceMonitoringService
{
    private readonly IManagedMqttClient _mqttClient;
    private readonly ILogger<DeviceMonitoringService> _logger;

    public DeviceMonitoringService(IManagedMqttClient mqttClient, ILogger<DeviceMonitoringService> logger)
    {
        _mqttClient = mqttClient;
        _logger = logger;
    }

    public async Task MonitorDeviceAsync(string deviceId)
    {
        // 订阅设备状态主题
        await _mqttClient.SubscribeAsync($"device/{deviceId}/status");
        await _mqttClient.SubscribeAsync($"device/{deviceId}/telemetry");

        // 发送设备控制命令
        var command = new
        {
            Command = "GET_STATUS",
            Timestamp = DateTime.UtcNow
        };

        await PublishCommandAsync(deviceId, command);
    }

    private async Task PublishCommandAsync(string deviceId, object command)
    {
        var json = JsonSerializer.Serialize(command);
        var topic = $"device/{deviceId}/command";

        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(json)
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .Build();

        await _mqttClient.EnqueueAsync(message);
        _logger.LogInformation($"命令已发送到设备 {deviceId}");
    }
}

最佳实践

使用连接池:对于高并发场景,考虑使用连接池来复用MQTT连接。

实现重连逻辑:网络不稳定时,实现指数退避的重连策略。

合理设置Keep Alive:根据网络环境调整心跳间隔,通常设置为30-60秒。