dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.5k stars 1.07k forks source link

NotAuthorized error when resubscribing to a topic with CleanStart=false using MQTTnet #2097

Closed vec715 closed 3 weeks ago

vec715 commented 4 weeks ago

I'm encountering an issue with MQTTnet where I receive a NotAuthorized error when attempting to unsubscribe from one topic and subscribe to a new topic during an active session with CleanStart set to false. This problem does not occur when I restart the application and establish a new session.

Steps to Reproduce:

  1. Establish a Connection:

Connect to the MQTT broker using the following client options:

var mqttOptions = new MqttClientOptionsBuilder()
    .WithProtocolVersion(MqttProtocolVersion.V500)
    .WithClientId("your_client_id")
    .WithTcpServer("broker_address", broker_port)
    .WithCredentials("username", "password")
    .WithTls(new MqttClientOptionsBuilderTlsParameters
    {
        UseTls = true,
        AllowUntrustedCertificates = true,
        IgnoreCertificateChainErrors = true,
        IgnoreCertificateRevocationErrors = true,
        CertificateValidationHandler = context => true // Accept all certificates for testing
    })
    .WithCleanStart(false)
    .WithSessionExpiryInterval(86400) // 24 hours
    .WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
    .WithTimeout(TimeSpan.FromSeconds(10))
    .Build();

Connect the client:

await _mqttClient.ConnectAsync(mqttOptions, CancellationToken.None);
  1. Subscribe to the Initial Topic:
await _mqttClient.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
    .WithTopicFilter(f => f.WithTopic("initial/topic").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce))
    .Build());
  1. Attempt to Resubscribe to a New Topic Within the Same Session:
// Unsubscribe from the initial topic
await _mqttClient.UnsubscribeAsync("initial/topic");

// Attempt to subscribe to the new topic
await _mqttClient.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
    .WithTopicFilter(f => f.WithTopic("new/topic").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce))
    .Build());
  1. Observe the Error:

The broker returns a NotAuthorized error when attempting to subscribe to the new topic.

Expected Behavior:

I expect to be able to unsubscribe from one topic and subscribe to another during an active session with CleanStart=false without receiving authorization errors.

Actual Behavior:

Unsubscribing from the initial topic succeeds.

Subscribing to the new topic fails with a NotAuthorized error.

Restarting the application and reconnecting (establishing a new session) allows subscribing to the new topic without issues.

Relevant Logs:

Unsubscribed from topic: initial/topic
Failed to subscribe to topic new/topic: NotAuthorized (NotAuthorized)
Retrying subscription to topic new/topic in 5 seconds...

Additional Information:

Comparison with Other Clients:

I have tested the same scenario using other MQTT clients (a Go script and MQTTX desktop app) with similar settings (CleanSession set to false), and they can resubscribe to different topics without encountering authorization errors. So the problem is not related to ACL for sure

Broker Details:

Broker: EMQX The broker supports MQTT v5.0

Questions:

Is there a known issue with MQTTnet regarding changing subscriptions within a persistent session (CleanStart=false)?

Are there additional configurations or steps required in MQTTnet to change subscriptions without receiving a NotAuthorized error?

Could this be a bug in MQTTnet's handling of subscription changes during an active session?

Code Snippets:

Here's the relevant part of my code:

Connection Setup:

private MqttClientOptions BuildMqttClientOptions()
{
    return new MqttClientOptionsBuilder()
        .WithProtocolVersion(MqttProtocolVersion.V500)
        .WithClientId("your_client_id")
        .WithTcpServer("broker_address", broker_port)
        .WithCredentials("username", "password")
        .WithTls(new MqttClientOptionsBuilderTlsParameters
        {
            UseTls = true,
            AllowUntrustedCertificates = true,
            IgnoreCertificateChainErrors = true,
            IgnoreCertificateRevocationErrors = true,
            CertificateValidationHandler = context => true
        })
        .WithCleanStart(false)
        .WithSessionExpiryInterval(86400)
        .WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
        .WithTimeout(TimeSpan.FromSeconds(10))
        .Build();
}

// Connecting the client
await _mqttClient.ConnectAsync(BuildMqttClientOptions(), CancellationToken.None);

Subscription Methods:

public async Task SubscribeAsync(string topic)
{
    var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
        .WithTopicFilter(f => f.WithTopic(topic).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce))
        .Build();

    var result = await _mqttClient.SubscribeAsync(subscribeOptions);
    // Handle result...
}

public async Task UnsubscribeAsync(string topic)
{
    await _mqttClient.UnsubscribeAsync(topic);
}

Resubscription Logic:

public async Task ResubscribeToNewTopicAsync(string newTopic)
{
    await UnsubscribeAsync("initial/topic");
    await SubscribeAsync(newTopic);
}

I appreciate any assistance or insights you can provide on this issue.