iot-mqtt-dotnet
IoT Data Processing with MQTT and .NET
MQTT (Message Queuing Telemetry Transport) is the de facto standard for IoT communication. Learn how to build real-time IoT pipelines using MQTT and .NET Core.
What is MQTT?
MQTT is a lightweight publish-subscribe messaging protocol designed for:
- Low bandwidth networks
- Unreliable connections
- Battery-powered devices
- Millions of devices
MQTT vs HTTP
| Feature | MQTT | HTTP | |---------|------|------| | Overhead | ~2 bytes | ~200+ bytes | | Pattern | Pub/Sub | Request/Response | | Connection | Persistent | One-time | | QoS Levels | 3 (0,1,2) | None built-in |Architecture Overview
┌────────────┐
│ IoT Devices │ (Publishers)
└──────┬───────┘
│ MQTT
┌───▼────┐
│ Broker │ (Mosquitto/EMQX)
└───┬────┘
│
┌───▼─────┐
│ .NET App │ (Subscriber)
└───┬─────┘
│
┌───▼──────┐
│ Database │
└──────────┘
Setting Up MQTT Broker
Mosquitto (Open Source)
# Install on Ubuntu
apt-get install mosquitto mosquitto-clients</p><h1>Configure authentication</h1>
mosquitto<em>passwd -c /etc/mosquitto/passwd username</p><h1>Edit config</h1>
nano /etc/mosquitto/mosquitto.conf
Configuration:
listener 1883
allow</em>anonymous false
password<em>file /etc/mosquitto/passwd</p><h1>WebSocket support</h1>
listener 9001
protocol websockets
EMQX (Enterprise)
docker run -d --name emqx \
-p 1883:1883 \
-p 8083:8083 \
-p 8084:8084 \
-p 18083:18083 \
emqx/emqx:latest
.NET MQTT Client
Using MQTTnet Library
dotnet add package MQTTnet
dotnet add package MQTTnet.Extensions.ManagedClient
Basic Publisher
using MQTTnet;
using MQTTnet.Client;</p><p>public class MqttPublisher
{
private IMqttClient </em>client;
public async Task ConnectAsync()
{
var factory = new MqttFactory();
<em>client = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithCredentials("username", "password")
.WithClientId("dotnet-publisher")
.WithCleanSession()
.Build();
await </em>client.ConnectAsync(options, CancellationToken.None);
Console.WriteLine("Connected to MQTT broker");
}
public async Task PublishSensorDataAsync(SensorReading reading)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic($"sensors/{reading.DeviceId}/temperature")
.WithPayload(JsonSerializer.Serialize(reading))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(false)
.Build();
await <em>client.PublishAsync(message, CancellationToken.None);
}
}
Basic Subscriber
public class MqttSubscriber
{
private IMqttClient </em>client;
public async Task ConnectAndSubscribeAsync()
{
var factory = new MqttFactory();
<em>client = factory.CreateMqttClient();
</em>client.ApplicationMessageReceivedAsync += HandleMessageAsync;
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithCredentials("username", "password")
.WithClientId("dotnet-subscriber")
.Build();
await <em>client.ConnectAsync(options, CancellationToken.None);
// Subscribe to topics
await </em>client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic("sensors/+/temperature")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
Console.WriteLine("Subscribed to temperature sensors");
}
private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
Console.WriteLine($"Received: {topic} - {payload}");
var reading = JsonSerializer.Deserialize<SensorReading>(payload);
await ProcessSensorDataAsync(reading);
}
}
Quality of Service (QoS) Levels
QoS 0 - At Most Once
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
- No acknowledgment
- Fastest, but messages may be lost
- Use for: Non-critical data
QoS 1 - At Least Once
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
- Guaranteed delivery
- May receive duplicates
- Use for: Important data
QoS 2 - Exactly Once
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
- Guaranteed single delivery
- Slowest
- Use for: Critical transactions
Advanced Patterns
Managed Client (Auto-Reconnect)
using MQTTnet.Extensions.ManagedClient;</p><p>public class ResilientMqttClient
{
private IManagedMqttClient <em>managedClient;
public async Task ConnectAsync()
{
var factory = new MqttFactory();
</em>managedClient = factory.CreateManagedMqttClient();
var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithCredentials("username", "password")
.Build())
.Build();
<em>managedClient.ApplicationMessageReceivedAsync += HandleMessageAsync;
</em>managedClient.ConnectedAsync += async e =>
{
Console.WriteLine("Connected!");
// Resubscribe on reconnect
await <em>managedClient.SubscribeAsync("sensors/+/temperature");
};
await </em>managedClient.StartAsync(options);
}
}
Topic Patterns
Wildcards:
+- Single level wildcard
sensors/+/temperature matches sensors/device1/temperature
#- Multi-level wildcard
sensors/# matches sensors/device1/temperature/celsius// Subscribe to all sensors
await <em>client.SubscribeAsync("sensors/#");</p><p>// Subscribe to specific metric across devices
await </em>client.SubscribeAsync("sensors/+/temperature");</p><p>// Subscribe to specific device, all metrics
await <em>client.SubscribeAsync("sensors/device123/+");
Retained Messages
Last known state for new subscribers:
var message = new MqttApplicationMessageBuilder()
.WithTopic("devices/device1/status")
.WithPayload("online")
.WithRetainFlag(true) // Keep last message
.Build();</p><p>await </em>client.PublishAsync(message);
Last Will and Testament (LWT)
Notify when device disconnects unexpectedly:
var options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
.WithWillTopic("devices/device1/status")
.WithWillPayload("offline")
.WithWillRetain(true)
.Build();
Real-World IoT Pipeline
Complete Example: Temperature Monitoring
Device (Publisher):
public class TemperatureSensor
{
private readonly IManagedMqttClient <em>client;
private readonly string </em>deviceId;
public async Task StartMonitoringAsync()
{
while (true)
{
var temperature = ReadTemperature();
var reading = new
{
DeviceId = <em>deviceId,
Temperature = temperature,
Timestamp = DateTime.UtcNow,
Unit = "Celsius"
};
var message = new MqttApplicationMessageBuilder()
.WithTopic($"sensors/{</em>deviceId}/temperature")
.WithPayload(JsonSerializer.Serialize(reading))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await <em>client.EnqueueAsync(message);
await Task.Delay(TimeSpan.FromMinutes(1));
}
}
private double ReadTemperature()
{
// Read from actual sensor hardware
return Random.Shared.NextDouble() * 30 + 10; // 10-40°C
}
}
Server (Subscriber + Processing):
public class TemperatureMonitoringService : BackgroundService
{
private readonly IManagedMqttClient </em>client;
private readonly IDbContext <em>dbContext;
private readonly IHubContext<TemperatureHub> </em>hubContext;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
<em>client.ApplicationMessageReceivedAsync += async e =>
{
var reading = JsonSerializer.Deserialize<TemperatureReading>(
Encoding.UTF8.GetString(e.ApplicationMessage.Payload)
);
// Store in database
await StoreReadingAsync(reading);
// Check thresholds
if (reading.Temperature > 35)
{
await SendAlertAsync(reading);
}
// Broadcast to web clients via SignalR
await </em>hubContext.Clients.All.SendAsync(
"TemperatureUpdate",
reading
);
};
await <em>client.SubscribeAsync("sensors/+/temperature");
}
private async Task StoreReadingAsync(TemperatureReading reading)
{
</em>dbContext.TemperatureReadings.Add(reading);
await <em>dbContext.SaveChangesAsync();
}
private async Task SendAlertAsync(TemperatureReading reading)
{
// Send email/SMS/push notification
await </em>notificationService.SendAlertAsync(
$"High temperature alert: {reading.Temperature}°C on device {reading.DeviceId}"
);
}
}
Best Practices
✅ Use Clean Session = false: For guaranteed message delivery
✅ Implement LWT: Know when devices go offline
✅ Choose appropriate QoS: Balance reliability vs performance
✅ Use Retained Messages: For device status/configuration
✅ Implement Reconnection: Networks are unreliable
✅ Structure Topics Logically: domain/location/device/metric
✅ Validate Payloads: Don't trust device data
✅ Monitor Broker: CPU, memory, connections
Security
TLS/SSL Encryption
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.example.com", 8883)
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12
})
.Build();
Authentication
.WithCredentials("device123", "securePassword123")
Authorization (Mosquitto ACL)
# /etc/mosquitto/acl
user device1
topic write sensors/device1/#
topic read config/device1/#</p><p>user backend
topic read sensors/#
topic write config/#
Conclusion
MQTT + .NET provides a powerful combination for IoT applications. Start with:
The protocol's simplicity and efficiency make it perfect for resource-constrained devices, while .NET provides the robust backend processing capabilities needed for enterprise IoT solutions.
Tags
Subscribe to Newsletter
Get notified about new articles