Back to Blog
iot

iot-mqtt-dotnet

February 8, 202413 min min read

IoT Data Processing with MQTT and .NET

MQTT (Message Queuing Telemetry Transport) is the de facto standard for IoT communication. Learn how to build real-time IoT pipelines using MQTT and .NET Core.

What is MQTT?

MQTT is a lightweight publish-subscribe messaging protocol designed for:

  • Low bandwidth networks
  • Unreliable connections
  • Battery-powered devices
  • Millions of devices

MQTT vs HTTP

| Feature | MQTT | HTTP | |---------|------|------| | Overhead | ~2 bytes | ~200+ bytes | | Pattern | Pub/Sub | Request/Response | | Connection | Persistent | One-time | | QoS Levels | 3 (0,1,2) | None built-in |

Architecture Overview

┌────────────┐
│ IoT Devices  │ (Publishers)
└──────┬───────┘
       │ MQTT
   ┌───▼────┐
   │ Broker  │ (Mosquitto/EMQX)
   └───┬────┘

   ┌───▼─────┐
   │ .NET App │ (Subscriber)
   └───┬─────┘

   ┌───▼──────┐
   │ Database  │
   └──────────┘

Setting Up MQTT Broker

Mosquitto (Open Source)

# Install on Ubuntu
apt-get install mosquitto mosquitto-clients</p><h1>Configure authentication</h1>
mosquitto<em>passwd -c /etc/mosquitto/passwd username</p><h1>Edit config</h1>
nano /etc/mosquitto/mosquitto.conf

Configuration:

listener 1883
allow</em>anonymous false
password<em>file /etc/mosquitto/passwd</p><h1>WebSocket support</h1>
listener 9001
protocol websockets

EMQX (Enterprise)

docker run -d --name emqx \
  -p 1883:1883 \
  -p 8083:8083 \
  -p 8084:8084 \
  -p 18083:18083 \
  emqx/emqx:latest

.NET MQTT Client

Using MQTTnet Library

dotnet add package MQTTnet
dotnet add package MQTTnet.Extensions.ManagedClient

Basic Publisher

using MQTTnet;
using MQTTnet.Client;</p><p>public class MqttPublisher
{
    private IMqttClient </em>client;
    
    public async Task ConnectAsync()
    {
        var factory = new MqttFactory();
        <em>client = factory.CreateMqttClient();
        
        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost", 1883)
            .WithCredentials("username", "password")
            .WithClientId("dotnet-publisher")
            .WithCleanSession()
            .Build();
        
        await </em>client.ConnectAsync(options, CancellationToken.None);
        
        Console.WriteLine("Connected to MQTT broker");
    }
    
    public async Task PublishSensorDataAsync(SensorReading reading)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic($"sensors/{reading.DeviceId}/temperature")
            .WithPayload(JsonSerializer.Serialize(reading))
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .WithRetainFlag(false)
            .Build();
        
        await <em>client.PublishAsync(message, CancellationToken.None);
    }
}

Basic Subscriber

public class MqttSubscriber
{
    private IMqttClient </em>client;
    
    public async Task ConnectAndSubscribeAsync()
    {
        var factory = new MqttFactory();
        <em>client = factory.CreateMqttClient();
        
        </em>client.ApplicationMessageReceivedAsync += HandleMessageAsync;
        
        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost", 1883)
            .WithCredentials("username", "password")
            .WithClientId("dotnet-subscriber")
            .Build();
        
        await <em>client.ConnectAsync(options, CancellationToken.None);
        
        // Subscribe to topics
        await </em>client.SubscribeAsync(new MqttTopicFilterBuilder()
            .WithTopic("sensors/+/temperature")
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .Build());
        
        Console.WriteLine("Subscribed to temperature sensors");
    }
    
    private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var topic = e.ApplicationMessage.Topic;
        var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        
        Console.WriteLine($"Received: {topic} - {payload}");
        
        var reading = JsonSerializer.Deserialize<SensorReading>(payload);
        await ProcessSensorDataAsync(reading);
    }
}

Quality of Service (QoS) Levels

QoS 0 - At Most Once

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
  • No acknowledgment
  • Fastest, but messages may be lost
  • Use for: Non-critical data

QoS 1 - At Least Once

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
  • Guaranteed delivery
  • May receive duplicates
  • Use for: Important data

QoS 2 - Exactly Once

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
  • Guaranteed single delivery
  • Slowest
  • Use for: Critical transactions

Advanced Patterns

Managed Client (Auto-Reconnect)

using MQTTnet.Extensions.ManagedClient;</p><p>public class ResilientMqttClient
{
    private IManagedMqttClient <em>managedClient;
    
    public async Task ConnectAsync()
    {
        var factory = new MqttFactory();
        </em>managedClient = factory.CreateManagedMqttClient();
        
        var options = new ManagedMqttClientOptionsBuilder()
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .WithClientOptions(new MqttClientOptionsBuilder()
                .WithTcpServer("localhost", 1883)
                .WithCredentials("username", "password")
                .Build())
            .Build();
        
        <em>managedClient.ApplicationMessageReceivedAsync += HandleMessageAsync;
        </em>managedClient.ConnectedAsync += async e =>
        {
            Console.WriteLine("Connected!");
            
            // Resubscribe on reconnect
            await <em>managedClient.SubscribeAsync("sensors/+/temperature");
        };
        
        await </em>managedClient.StartAsync(options);
    }
}

Topic Patterns

Wildcards:

  • + - Single level wildcard
- sensors/+/temperature matches sensors/device1/temperature
  • # - Multi-level wildcard
- sensors/# matches sensors/device1/temperature/celsius

// Subscribe to all sensors
await <em>client.SubscribeAsync("sensors/#");</p><p>// Subscribe to specific metric across devices
await </em>client.SubscribeAsync("sensors/+/temperature");</p><p>// Subscribe to specific device, all metrics
await <em>client.SubscribeAsync("sensors/device123/+");

Retained Messages

Last known state for new subscribers:

var message = new MqttApplicationMessageBuilder()
    .WithTopic("devices/device1/status")
    .WithPayload("online")
    .WithRetainFlag(true) // Keep last message
    .Build();</p><p>await </em>client.PublishAsync(message);

Last Will and Testament (LWT)

Notify when device disconnects unexpectedly:

var options = new MqttClientOptionsBuilder()
    .WithTcpServer("localhost", 1883)
    .WithWillTopic("devices/device1/status")
    .WithWillPayload("offline")
    .WithWillRetain(true)
    .Build();

Real-World IoT Pipeline

Complete Example: Temperature Monitoring

Device (Publisher):

public class TemperatureSensor
{
    private readonly IManagedMqttClient <em>client;
    private readonly string </em>deviceId;
    
    public async Task StartMonitoringAsync()
    {
        while (true)
        {
            var temperature = ReadTemperature();
            
            var reading = new
            {
                DeviceId = <em>deviceId,
                Temperature = temperature,
                Timestamp = DateTime.UtcNow,
                Unit = "Celsius"
            };
            
            var message = new MqttApplicationMessageBuilder()
                .WithTopic($"sensors/{</em>deviceId}/temperature")
                .WithPayload(JsonSerializer.Serialize(reading))
                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                .Build();
            
            await <em>client.EnqueueAsync(message);
            
            await Task.Delay(TimeSpan.FromMinutes(1));
        }
    }
    
    private double ReadTemperature()
    {
        // Read from actual sensor hardware
        return Random.Shared.NextDouble() * 30 + 10; // 10-40°C
    }
}

Server (Subscriber + Processing):

public class TemperatureMonitoringService : BackgroundService
{
    private readonly IManagedMqttClient </em>client;
    private readonly IDbContext <em>dbContext;
    private readonly IHubContext<TemperatureHub> </em>hubContext;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        <em>client.ApplicationMessageReceivedAsync += async e =>
        {
            var reading = JsonSerializer.Deserialize<TemperatureReading>(
                Encoding.UTF8.GetString(e.ApplicationMessage.Payload)
            );
            
            // Store in database
            await StoreReadingAsync(reading);
            
            // Check thresholds
            if (reading.Temperature > 35)
            {
                await SendAlertAsync(reading);
            }
            
            // Broadcast to web clients via SignalR
            await </em>hubContext.Clients.All.SendAsync(
                "TemperatureUpdate", 
                reading
            );
        };
        
        await <em>client.SubscribeAsync("sensors/+/temperature");
    }
    
    private async Task StoreReadingAsync(TemperatureReading reading)
    {
        </em>dbContext.TemperatureReadings.Add(reading);
        await <em>dbContext.SaveChangesAsync();
    }
    
    private async Task SendAlertAsync(TemperatureReading reading)
    {
        // Send email/SMS/push notification
        await </em>notificationService.SendAlertAsync(
            $"High temperature alert: {reading.Temperature}°C on device {reading.DeviceId}"
        );
    }
}

Best Practices

Use Clean Session = false: For guaranteed message delivery ✅ Implement LWT: Know when devices go offline ✅ Choose appropriate QoS: Balance reliability vs performance ✅ Use Retained Messages: For device status/configuration ✅ Implement Reconnection: Networks are unreliable ✅ Structure Topics Logically: domain/location/device/metricValidate Payloads: Don't trust device data ✅ Monitor Broker: CPU, memory, connections

Security

TLS/SSL Encryption

var options = new MqttClientOptionsBuilder()
    .WithTcpServer("broker.example.com", 8883)
    .WithTls(new MqttClientOptionsBuilderTlsParameters
    {
        UseTls = true,
        SslProtocol = System.Security.Authentication.SslProtocols.Tls12
    })
    .Build();

Authentication

.WithCredentials("device123", "securePassword123")

Authorization (Mosquitto ACL)

# /etc/mosquitto/acl
user device1
topic write sensors/device1/#
topic read config/device1/#</p><p>user backend
topic read sensors/#
topic write config/#

Conclusion

MQTT + .NET provides a powerful combination for IoT applications. Start with:

  • Simple pub/sub for data collection
  • Add managed client for resilience
  • Implement proper QoS for your use case
  • Secure with TLS and authentication
  • Scale with clustered brokers
  • The protocol's simplicity and efficiency make it perfect for resource-constrained devices, while .NET provides the robust backend processing capabilities needed for enterprise IoT solutions.

    Share this article

    Tags

    IoT MQTT .NET Real-time

    Subscribe to Newsletter

    Get notified about new articles