dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.51k stars 1.07k forks source link

Bad performance #923

Closed tomwimmenhove closed 4 years ago

tomwimmenhove commented 4 years ago

Describe the bug

I read issue #31, which addressed bad performance issues. It looked like it had been fixed, but I'm still not able to squeeze much more than 500req/sec out of it. I'm connecting from a windows VM to a Linux server running mosquitto. Both are running on the same hardware. The mosquitto is practically asleep, while the .NET application is running at full tilt. I've tried it with both with and without async calls.

Which project is your bug related to?

To Reproduce

Steps to reproduce the behavior:

  1. Connect to server
  2. Send messages in an infinite loop
  3. Observe sending rate

Expected behavior

A clear and concise description of what you expected to happen.

Additional context / logging

KeepAlive timespan is set to 1 day

Code example

    static void Test(int total)
    {
        var factory = new MqttFactory();
        var mqttClient = factory.CreateMqttClient();
        var options = new MqttClientOptionsBuilder()
            .WithTcpServer("192.168.0.7")
            .WithKeepAlivePeriod(new TimeSpan(1, 0, 0, 0))
            .Build();

        //await mqttClient.ConnectAsync(options, CancellationToken.None);
        mqttClient.ConnectAsync(options, CancellationToken.None).Wait();

        var message = new MqttApplicationMessageBuilder()
            .WithTopic("SomeTopic")
            .Build();

        for (var n = 0; n < total; n++)
        {
            message.Payload = Encoding.UTF8.GetBytes($"test {n}");

            //await mqttClient.PublishAsync(message, CancellationToken.None);
            mqttClient.PublishAsync(message, CancellationToken.None).Wait();

            if (n % 1000 == 0)
            {
                Console.WriteLine($"n: {n}");
            }
        }
    }
tomwimmenhove commented 4 years ago

I just did a quick check using M2Mqtt, which seems to do a little under 10k req/sec under the same circumstances. So it doesn't seem to be an issue with my setup.

chkr1011 commented 4 years ago

Hi, please answer the following questions so that I can try to analyze this issue:

  1. Please avoid using keep alive interval (while testing this issue).
  2. Which version do you use? 3.0.10-rc1?
  3. Which platform do you use? .NET 4.5.2, 4.6, netstandard2.0?
  4. Do you use the nuget or just some DLLs?
chkr1011 commented 4 years ago

Please try 3.0.10-rc2 in the meantime.

JanEggers commented 4 years ago

use async version and run it in release mode, i didnt try your code but when running the benchmarks im above 200K msg/sec

JanEggers commented 4 years ago

if i run your code slightly modified i get 50k / sec with mqttnet and 300K msg/sec with kesterel as server and pipelines based transport in the client

using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet;
using MQTTnet.AspNetCore;
using MQTTnet.AspNetCore.Client;
using MQTTnet.Client.Options;
using MQTTnet.Server;
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    class Program
    {
        static async Task Main() 
        {
            await MqttNet();
            await MqttNetAspnetCore();
            Console.ReadLine();
        }

        static async Task MqttNet()
        {
            var factory = new MqttFactory();

            var server = factory.CreateMqttServer();

            var mqttClient = factory.CreateMqttClient();
            var options = new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1")
                .WithKeepAlivePeriod(new TimeSpan(1, 0, 0, 0))
                .Build();

            await server.StartAsync(new MqttServerOptionsBuilder().Build());

            await mqttClient.ConnectAsync(options, CancellationToken.None);

            var message = new MqttApplicationMessageBuilder()
                .WithTopic("SomeTopic")
                .Build();

            var sw = Stopwatch.StartNew();
            var total = 100000;

            for (var n = 0; n < total; n++)
            {
                message.Payload = Encoding.UTF8.GetBytes($"test {n}");

                await mqttClient.PublishAsync(message, CancellationToken.None);

                if (n % 1000 == 0)
                {
                    //Console.WriteLine($"n: {n}");
                }
            }

            sw.Stop();

            Console.WriteLine($"msg/sec {total / sw.Elapsed.TotalSeconds}");
        }

        class Startup
        {
            public void ConfigureServices(IServiceCollection services)
            {
                services
                    .AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
                    .AddMqttConnectionHandler()
                    .AddConnections();
            }

            public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
            {
            }
        }

        static async Task MqttNetAspnetCore()
        {
            var factory = new MqttFactory();

            var host = WebHost.CreateDefaultBuilder()
                .UseKestrel(o =>
                {
                    o.ListenAnyIP(1883, l => l.UseMqtt());
                })
                .UseStartup<Startup>()
                .Build();

            var mqttClient = factory.CreateMqttClient(new MqttClientConnectionContextFactory() );
            var options = new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1")
                .WithKeepAlivePeriod(new TimeSpan(1, 0, 0, 0))
                .Build();

            await host.StartAsync();

            await mqttClient.ConnectAsync(options, CancellationToken.None);

            var message = new MqttApplicationMessageBuilder()
                .WithTopic("SomeTopic")
                .Build();

            var sw = Stopwatch.StartNew();
            var total = 100000;

            for (var n = 0; n < total; n++)
            {
                message.Payload = Encoding.UTF8.GetBytes($"test {n}");

                await mqttClient.PublishAsync(message, CancellationToken.None);

                if (n % 1000 == 0)
                {
                    //Console.WriteLine($"n: {n}");
                }
            }

            sw.Stop();

            Console.WriteLine($"msg/sec {total / sw.Elapsed.TotalSeconds}");
        }
    }
}
SeppPenner commented 4 years ago

Can this be closed? It seems like we're not able to reproduce your issue, @tomwimmenhove...

tomwimmenhove commented 4 years ago

Strange. I'll look into it again if/when I have the time. I'll close it and maybe re-open if I find out more. Thanks for your trouble!