dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.51k stars 1.07k forks source link

Publish periodly #1053

Closed WimmerMarkus closed 3 years ago

WimmerMarkus commented 3 years ago

Hello, have build a .neCore Application and want to Publish DateTimeTopic everySecond, it works for a Time then Stops then the Task without any exception or message. the application need about 75MB of Memory (RAM) when it stops the Memory is 123MB. To reproduce the scene i put the timespan of 10ms, after ~3000Messages stops the CallBack from the Timer. Any Ideas?

public async Task StartAsync(CancellationToken cancellationToken)
        {
            await mqttClient.ConnectAsync(options);
            if (!mqttClient.IsConnected)
            {
                await mqttClient.ReconnectAsync();
            }

            new Timer(BroadcastCall, null, TimeSpan.Zero, TimeSpan.FromMilliseconds(10));
        }

 private async void BroadcastCall(object state)
        {
            messageCounter++;

            var message = new MqttApplicationMessageBuilder()
              .WithTopic("Broadcast/DateTime")
              .WithPayload(new UTF8Encoding().GetBytes(DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss")))
              .WithExactlyOnceQoS()
              .Build();

            await this.mqttClient.PublishAsync(message);
            logger.LogInformation($"Messages: {messageCounter}");
        }
chkr1011 commented 3 years ago

Maybe no solution but please try QoS 1 instead. Then we can see if the QoS 2 implementation is the root cause.

WimmerMarkus commented 3 years ago

what does you mean?

var message = new MqttApplicationMessageBuilder() .WithTopic("Broadcast/DateTime") .WithPayload(new UTF8Encoding().GetBytes(DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss"))) **.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)** .Build(); it takes the same result...

chkr1011 commented 3 years ago

Please also put some locking around the usage of the client (or a SemaphoreSlim). The problem is that you are basically starting a task every 10 ms without waiting for finish. So after some time multiple tasks are trying to send data at the same time.

This library is NOT thread safe.

SeppPenner commented 3 years ago

I wouldn't use a timer for this either. Maybe an infinite loop (while(true)) and a Task.Delay instead of the timer could help here:

public async Task RunInBackground(CancellationToken cancellationToken)
{
    await mqttClient.ConnectAsync(options);

    if (!mqttClient.IsConnected)
    {
        await mqttClient.ReconnectAsync();
    }

    var messageCounter = 0;

    while (!cancellationToken.IsCancellationRequested)
    {
        messageCounter++;

        var message = new MqttApplicationMessageBuilder()
            .WithTopic("Broadcast/DateTime")
            .WithPayload(new UTF8Encoding().GetBytes(DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss")))
            .WithExactlyOnceQoS()
            .Build();

        await this.mqttClient.PublishAsync(message);
        logger.LogInformation($"Messages: {messageCounter}");
        await Task.Delay(1000);
    }
}
chkr1011 commented 3 years ago

I close this ticket because I assume it is fixed.