June 6, 20252 minutes
最近在对接第三方供应商设备时接触到了MQTT协议,在物联网(IoT)领域 MQTT(Message Queuing Telemetry Transport)作为一种轻量级的消息传输协议,已经成为IoT领域的标准选择。所以准备梳理下MQTT协议的核心概念。
MQTT是一种基于发布/订阅(Publish/Subscribe)模式的轻量级消息传输协议。它最初由IBM于1999年开发,专门为低带宽、高延迟或不稳定的网络环境设计。2019年,MQTT 5.0成为OASIS标准。
轻量级设计:MQTT协议的设计极其简洁,最小的消息头只有2个字节,这使得它非常适合带宽受限的环境。
发布/订阅模式:采用异步消息传递模式,发布者和订阅者之间解耦,通过中介代理(Broker)进行消息路由。
三种QoS级别:
会话保持:支持持久会话,即使客户端断开连接,代理也能保存未传递的消息。
遗嘱消息:客户端可以设置遗嘱消息,当异常断开时由代理发送给相关订阅者。
MQTT代理(Broker):消息中介服务器,负责接收发布者的消息并转发给相应的订阅者。常见的代理有Mosquitto、EMQ X、HiveMQ等。
发布者(Publisher):发送消息到特定主题的客户端。
订阅者(Subscriber):订阅特定主题并接收消息的客户端。
主题(Topic):消息的分类标识,采用层次结构,如sensor/temperature/room1。
MQTT支持两种通配符:
+:单级通配符,匹配一个主题级别#:多级通配符,匹配多个主题级别例如:sensor/+/temperature可以匹配sensor/room1/temperature和sensor/room2/temperature。
在.NET生态系统中,最受欢迎的MQTT客户端库是MQTTnet。它是一个开源、高性能的.NET MQTT库,支持客户端和服务器功能。
dotnet add package MQTTnet
dotnet add package MQTTnet.Extensions.ManagedClientusing MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
public class MqttClientService
{
private IMqttClient _mqttClient;
public async Task ConnectAsync()
{
var factory = new MqttFactory();
_mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithClientId("DotNetClient")
.WithCredentials("username", "password")
.WithCleanSession()
.Build();
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;
_mqttClient.ConnectedAsync += OnConnected;
_mqttClient.DisconnectedAsync += OnDisconnected;
await _mqttClient.ConnectAsync(options);
}
private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
Console.WriteLine($"接收到消息 - 主题: {topic}, 内容: {payload}");
// 处理消息逻辑
await ProcessMessage(topic, payload);
}
private async Task OnConnected(MqttClientConnectedEventArgs e)
{
Console.WriteLine("MQTT客户端已连接");
// 连接成功后订阅主题
await SubscribeToTopics();
}
private Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine($"MQTT客户端断开连接: {e.Reason}");
return Task.CompletedTask;
}
}public async Task SubscribeToTopics()
{
var topicFilters = new List<MqttTopicFilter>
{
new MqttTopicFilterBuilder()
.WithTopic("sensor/+/temperature")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build(),
new MqttTopicFilterBuilder()
.WithTopic("device/status/#")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.Build()
};
await _mqttClient.SubscribeAsync(topicFilters.ToArray());
Console.WriteLine("已订阅主题");
}public async Task PublishAsync(string topic, object payload)
{
var json = JsonSerializer.Serialize(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag() // 保留消息
.Build();
await _mqttClient.PublishAsync(message);
Console.WriteLine($"消息已发布到主题: {topic}");
}
// 使用示例
var sensorData = new
{
Temperature = 25.5,
Humidity = 60.2,
Timestamp = DateTime.UtcNow
};
await PublishAsync("sensor/room1/data", sensorData);public class DeviceMonitoringService
{
private readonly IManagedMqttClient _mqttClient;
private readonly ILogger<DeviceMonitoringService> _logger;
public DeviceMonitoringService(IManagedMqttClient mqttClient, ILogger<DeviceMonitoringService> logger)
{
_mqttClient = mqttClient;
_logger = logger;
}
public async Task MonitorDeviceAsync(string deviceId)
{
// 订阅设备状态主题
await _mqttClient.SubscribeAsync($"device/{deviceId}/status");
await _mqttClient.SubscribeAsync($"device/{deviceId}/telemetry");
// 发送设备控制命令
var command = new
{
Command = "GET_STATUS",
Timestamp = DateTime.UtcNow
};
await PublishCommandAsync(deviceId, command);
}
private async Task PublishCommandAsync(string deviceId, object command)
{
var json = JsonSerializer.Serialize(command);
var topic = $"device/{deviceId}/command";
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await _mqttClient.EnqueueAsync(message);
_logger.LogInformation($"命令已发送到设备 {deviceId}");
}
}使用连接池:对于高并发场景,考虑使用连接池来复用MQTT连接。
实现重连逻辑:网络不稳定时,实现指数退避的重连策略。
合理设置Keep Alive:根据网络环境调整心跳间隔,通常设置为30-60秒。