dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.49k stars 1.07k forks source link

WebSocket .NET 5.0 connection and publish issues #1197

Open xeonfusion opened 3 years ago

xeonfusion commented 3 years ago

Describe your question

There are issues with Managed client and WebSockets, it fails (or takes a long time more than 30-60 secs) to connect and publish to a test Mosquitto broker, with .NET 5.0 on Windows 10, and fails on Mac OS X. It does work sometimes if .NET standard framework or Mono is used on Mac OS X or Linux.

I suspect platform specific code issues with the ManagedClient code (messages getting enqueued without actually being published, Task completion or threading issues?). The examples for ManagedClient here: https://dzone.com/articles/mqtt-publishing-and-subscribing-messages-to-mqtt-b and https://github.com/chkr1011/MQTTnet/wiki/ManagedClient were having these issues with dotnet core.

I managed to get the code working only using a Task completion and Task.ContinueWith() approach as shown here (@chkr1011 is this an optimal approach?):

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken token = source.Token;

            var mqttClient = new MqttFactory().CreateMqttClient();
            var managedClient = new ManagedMqttClient(mqttClient, new MqttNetLogger());

            try
            {
                var task = Task.Run(async () =>
                {
                    var connected = GetConnectedTask(managedClient);
                    await ConnectMQTTAsync(managedClient, token, m_MQTTUrl, m_MQTTclientId, m_MQTTuser, m_MQTTpassw);
                    await connected;

                });

                task.ContinueWith(antecedent => {
                    if (antecedent.Status == TaskStatus.RanToCompletion)
                    {
                        Task.Run(async () =>
                        {
                            await PublishMQTTAsync(managedClient, token, m_MQTTtopic, serializedJSON);
                            await managedClient.StopAsync();
                        });
                    }
                });

            }

            catch (Exception _Exception)
            {
                Console.WriteLine("Exception caught in process: {0}", _Exception.ToString());
            }
public static async Task ConnectMQTTAsync(ManagedMqttClient mqttClient, CancellationToken token, string mqtturl, string clientId, string mqttuser, string mqttpassw)
        {
            bool mqttSecure = true;

            var messageBuilder = new MqttClientOptionsBuilder()
            .WithClientId(clientId)
            .WithCredentials(mqttuser, mqttpassw)
            .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
            .WithWebSocketServer(mqtturl)
            .WithCleanSession();

            var options = mqttSecure
            ? messageBuilder
                .WithTls()
                .Build()
            : messageBuilder
                .Build();

            var managedOptions = new ManagedMqttClientOptionsBuilder()
              .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
              .WithClientOptions(options)
              .Build();

            await mqttClient.StartAsync(managedOptions);

        }

        Task GetConnectedTask(ManagedMqttClient managedClient)
        {
            TaskCompletionSource<bool> connected = new TaskCompletionSource<bool>();
            managedClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
            {
                managedClient.ConnectedHandler = null;
                connected.SetResult(true);
            });
            return connected.Task;
        }

        public static async Task PublishMQTTAsync(ManagedMqttClient mqttClient, CancellationToken token, string topic, string payload, bool retainFlag = true, int qos = 1)
        {
            if (mqttClient.IsConnected)
            {
                await mqttClient.PublishAsync(new MqttApplicationMessageBuilder()
                 .WithTopic(topic)
                 .WithPayload(payload)
                 .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                 .WithRetainFlag(retainFlag)
                 .Build(), token);
            }
        }

Which project is your question related to?

lazydino commented 3 years ago

I am experiencing the same kind of connection issues running a simple .net 5.0 console MQTT client against a mosquitto broker in linux via websockets. I am able to verify the websockets connectivity through a test client like https://hezhii.github.io/mqttjs-client/ , but the .net client would fail (connection time out) in 10 seconds or so. The managed client failed too.

Here is the options for the managed mqtt.net websockets client I use.

var options = new ManagedMqttClientOptionsBuilder()

            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .WithStorage(default(IManagedMqttClientStorage)) // TODO: need to implement.
            .WithClientOptions(new MQTTnet.Client.Options.MqttClientOptionsBuilder()
                .WithClientId(chatid)
                ////.WithCredentials("Your Secret Here")
                .WithKeepAlivePeriod(TimeSpan.FromHours(24))
                //.WithKeepAliveSendInterval(TimeSpan.FromSeconds(5))
                .WithCleanSession()
                .WithWebSocketServer(mosquittourl)
                .WithTls()
                .Build()
            )
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
            .Build();

My goal is to utilize the very same .NET 5 client code for realtime communications between web clients and desktops. One runs on destop or devices as IoT applications, and the other runs as a Blazor web app via webassembly in the browser. Would MQTT.net support this type of inter-operable scenario on the client side?

fabioGulfMike commented 3 years ago

I am experiencing a similar problem using this libs on Blazor Progressive Web Applications. I get stable connection with a broker (using VerneMQ on a local docker container) but it do not publish any message.

The code await mqttClient.PublishAsync(MqttMessage, CancellationToken.None); returns a success code, but using another MQTT client, the message do not reach the topic. The UseApplicationMessageReceivedHandler works perfectly!

Just for test, let me share my code (I used MudBlazor as web design):

Index.razor


<MudGrid Justify="Justify.FlexStart" Spacing="0">
<MudItem xs="12">
<MudGrid Justify="Justify.FlexStart" Spacing="0">
<!-- Server -->
<MudItem xs="6">
<MudPaper Class="pa-3 ma-3" Elevation="3">
<MudGrid Justify="Justify.FlexStart" Spacing="1" Style="align-items:center;">
<MudItem xs="1">
<MudIcon Icon="@Icons.Material.Filled.Circle" Size="Size.Small" Color="@ConnectionStatus" />
</MudItem>
<MudItem xs="7">
<MudTextField @bind-Value="model.Server" Label="Server" Variant="Variant.Outlined" Adornment="Adornment.Start" AdornmentIcon="@Icons.Material.Filled.Computer" AdornmentColor="Color.Warning" />
</MudItem>
<MudItem xs="4">
<MudButton FullWidth="true" Variant="Variant.Filled" Color="Color.Primary">@ButtonConnectText</MudButton>
</MudItem>
<MudSpacer />
</MudGrid>
</MudPaper>
</MudItem> <!-- Server -->
<!-- User/Password -->
<MudItem xs="6">
<MudPaper Class="pa-3 ma-3" Elevation="3">
<MudGrid Justify="Justify.FlexStart" Spacing="1" Style="align-items:center;">
<MudItem xs="4">
<MudSwitch @bind-Checked="@Authenticated" Label="@(Authenticated ? "User" : "No User")" />
</MudItem>
<MudItem xs="4">
<MudTextField @bind-Value="model.Username" Label="User" Variant="Variant.Outlined" Disabled="@(!Authenticated)" />
</MudItem>
<MudItem xs="4">
<MudTextField @bind-Value="model.Password" Label="Password" Variant="Variant.Outlined" InputType="@PasswordInput" Adornment="Adornment.End" AdornmentIcon="@PasswordInputIcon" OnAdornmentClick="ShowPasswordButton" Disabled="@(!Authenticated)" />
</MudItem>
</MudGrid>
</MudPaper>
</MudItem> <!-- User/Password -->
</MudGrid>
</MudItem>
</MudGrid>

@code{ MqttFactory factory = new MqttFactory(); ManagedMqttClient mqttClient { get; set; } ManagedMqttClientOptions ManagedOptions;

IMqttClientOptions clientOptions;
MqttClientCredentials ClientCredentials;

private async Task Connect()
{
    var server = model.Server;
    var port = model.Port;

    Console.WriteLine($"--> Connecting to server {server}...");

    if (!string.IsNullOrEmpty(model.Username) && !string.IsNullOrEmpty(model.Password))
    {
        ClientCredentials = new MqttClientCredentials();
        ClientCredentials.Username = model.Username;
        ClientCredentials.Password = Encoding.ASCII.GetBytes(model.Password);

        clientOptions = new MqttClientOptionsBuilder()
                        .WithClientId("Client1")
                        .WithCredentials(ClientCredentials)
                        .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
                        .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
                        .WithCleanSession()
                        //.WithTls()
                        .WithWebSocketServer($"{server}:8080/mqtt")
                        .Build();
    }
    else
    {
        clientOptions = new MqttClientOptionsBuilder()
            .WithClientId("Client1")
            .WithCommunicationTimeout(new TimeSpan(0, 0, 5))
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(1200))
            .WithCleanSession()
            .WithWebSocketServer($"{server}:8080/mqtt")
            .Build();
    }

    ManagedOptions = new ManagedMqttClientOptionsBuilder()
                    .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
                    .WithClientOptions(clientOptions)
                    .Build();

    mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
    mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
    mqttClient.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnConnectingFailed);

    var topicFilter = new MqttTopicFilterBuilder()
    .WithTopic("YOUR_TOPIC")
    .Build();

    await mqttClient.SubscribeAsync(topicFilter);
    mqttClient.UseApplicationMessageReceivedHandler(ReceiveMessage);

    try
    {
        await mqttClient.StartAsync(ManagedOptions);
    }
    catch (Exception e)
    {
        Console.WriteLine($"--> Not connected. Error: {e.Message}");

        this.ButtonConnectText = "Connect";
    }
}

void OnConnected(MqttClientConnectedEventArgs obj)
{
    ConnectionStatus = Color.Success;
    this.ButtonConnectText = "Disconnect";
    Console.WriteLine("--> Connected!");
    StateHasChanged();
}

void OnConnectingFailed(ManagedProcessFailedEventArgs obj)
{
    ConnectionStatus = Color.Error;
    Console.WriteLine($"--> Not connected! Error: {obj.Exception}");
    this.ButtonConnectText = "Connect";
    StateHasChanged();
}

void OnDisconnected(MqttClientDisconnectedEventArgs obj)
{
    ConnectionStatus = Color.Error;
    Console.WriteLine($"--> Not connected! Error: {obj.Reason}");
    this.ButtonConnectText = "Connect";
    StateHasChanged();
}

async Task PublishMessage(string message, string topic, bool retain = false, int qos = 0)
{
    try
    {
        var MqttMessage = new MqttApplicationMessageBuilder()
                                .WithTopic(topic)
                                .WithPayload(message)
                                .WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)
                                .WithRetainFlag(retain)
                                .Build();

        if (mqttClient.IsConnected)
        {
            var RESULT = await mqttClient.PublishAsync(MqttMessage, CancellationToken.None);

            Console.WriteLine($"--> Publish: {RESULT.ReasonString} (code {RESULT.ReasonCode}) - {JsonConvert.SerializeObject(RESULT.UserProperties)}");
        }
    }
    catch (Exception e)
    {
        Console.WriteLine($"--> Error on publish {message} on '{topic}'. Error: {e.Message}");
    }
}

void ReceiveMessage(MqttApplicationMessageReceivedEventArgs e)
{
    DeviceMessage = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
    Topic = e.ApplicationMessage.Topic;

    Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
    Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
    Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
    Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
    Console.WriteLine();
    StateHasChanged();
}
#endregion

#region Server and User/Password Card

public class ServerForm
{
    [Required]
    public string Server { get; set; }
    public string Port { get; set; }
    public string Username { get; set; } = "";
    public string Password { get; set; } = "";
}

ServerForm model = new ServerForm();

Color ConnectionStatus { get; set; }

string ButtonConnectText { get; set; }

bool Authenticated { get; set; }

string PasswordInputIcon { get; set; } = Icons.Material.Filled.VisibilityOff;

bool ShowPassword { get; set; }

InputType PasswordInput { get; set; }

void ShowPasswordButton()
{
    if (ShowPassword)
    {
        ShowPassword = false;
        PasswordInputIcon = Icons.Material.Filled.VisibilityOff;
        PasswordInput = InputType.Password;
    }
    else
    {
        ShowPassword = true;
        PasswordInputIcon = Icons.Material.Filled.Visibility;
        PasswordInput = InputType.Text;
    }
}

#endregion

#region Page Life Cicle

protected override Task OnInitializedAsync()
{
    ButtonConnectText = "Connect";
    Authenticated = false;
    ConnectionStatus = Color.Error;
    ShowPassword = false;
    PasswordInput = InputType.Password;

    var factory = new MqttFactory();
    mqttClient = (ManagedMqttClient)factory.CreateManagedMqttClient();

    return base.OnInitializedAsync();
}

async Task ConnectDisconnectButtonClick(string status)
{
    if (status == "Disconnect")
    {
        await mqttClient.StopAsync();
    }
    else if (status == "Connect")
    {
        await this.Connect();
    }
}

#endregion

}


> _Imports.razor

// Other namespaces // ... @using MudBlazor @using MQTTnet; @using MQTTnet.Client; @using MQTTnet.Client.Connecting; @using MQTTnet.Client.Disconnecting; @using MQTTnet.Client.Options; @using MQTTnet.Client.Publishing; @using MQTTnet.Client.Subscribing; @using MQTTnet.Exceptions; @using MQTTnet.Extensions.ManagedClient; @using MQTTnet.Packets; @using MQTTnet.Protocol; @using Newtonsoft.Json;

sbroszat commented 2 years ago

I have the same bug as fabioGulfMike. I use MQTTnet with Websocket in a Blazor .net5.0 Webassambly app. Subscribing works without any problems. But publishing does not work, the queue is getting longer and longer, but no messages are arriving the broker.

When I use the debugger to slowly step through the Publish() code while return back into the Blazor Framework publishing works and the broker receives the messages. So in my opinion there is something wronge with the TryPublishQueuedMessageAsync. Maybe its never called or the Task never get compute time in this single thread environment.

iotechFabio commented 2 years ago

@sbroszat, I changed to MQTT client implemented in JavaScript and used the JS.Interop libs to build my project. Faster and more reliable.

mvanhil commented 2 years ago

I have the same issue and came upon this thread

sbroszat commented 2 years ago

@mvanhil if you mean Blazor: I changed also to the Paho JS MQTT Client with JS.Interop. Thanks for the hint @iotechFabio. Works without problems.

mvanhil commented 2 years ago

I'm now using the default library (not the managed client) and all is working perfectly for now (just publishing).