rabbitmq / rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html
Other
122 stars 41 forks source link

Multiple Producers and Consumers per connection #328

Closed Gsantomaggio closed 11 months ago

Gsantomaggio commented 1 year ago

Preface

The RabbitMQ stream protocol supports multi-producers and multi-consumers per TCP Connection. We designed The .NET stream to have one TCP connection to make it simple.

Pull request

This PR adds the feature without impacting too much on the code. Even some change is required.

We added a new class: ConnectionsPool. The pool is a Dictionary with client_id and the connection_info.

The client_id is an internal field to identify the connection uniquely.

The connection_info contains info like the ActiveIds used for that connection. The values of the IDs don't matter; only the count.

For example, if a connection has producer ids: 10,11,45, the ActiveIds is 3

How does it work?

Suppose I want to have max two ids for connection:

var pool = new ConnectionsPool(0, 2);
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
    metaDataInfo.StreamInfos[Stream1]);

var p2 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream2) { Pool = pool },metaDataInfo.StreamInfos[Stream2]);

In this case, the connection is one with two ids, see the image: img_1

Calling p1.close() reduces the reference on the pool for that connection from two to one

Calling p2.close() reduces the reference on the pool for that connection from one to zero. At this point, the connection is closed.

Inside the ConnectionsPoolTests, you can find the tests with comments for each use case.

How to test

I am using this script where I can tune the parameters:

using System.Net;
using System.Runtime.InteropServices.ComTypes;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using RabbitMQ.Stream.Client.Reliable;

namespace example;

public class Pool
{
    public static async Task Start()
    {
        var serviceCollection = new ServiceCollection();
        serviceCollection.AddLogging(builder => builder
            .AddSimpleConsole(options =>
            {
                options.IncludeScopes = true;
                options.SingleLine = true;
                options.TimestampFormat = "[HH:mm:ss] ";
                options.ColorBehavior = LoggerColorBehavior.Default;
            })
            .AddFilter(level => level >= LogLevel.Information)
        );
        var loggerFactory = serviceCollection.BuildServiceProvider()
            .GetService<ILoggerFactory>();

        var lp = loggerFactory.CreateLogger<Producer>();
        var lc = loggerFactory.CreateLogger<Consumer>();
        const int streams = 10;
        const int producers = 20;
        const byte producersPerConnection = 12;
        const int messagesPerProducer = 5000;

        const int consumers = 20;
        const byte consumersPerConnection = 13;

        var system = await StreamSystem.Create(new StreamSystemConfig()
        {
            Endpoints = new List<EndPoint>()
            {
                new DnsEndPoint("node1", 5572),
                new DnsEndPoint("node0", 5562),
                new DnsEndPoint("node2", 5582),
            },

            ConnectionPoolConfig = new ConnectionPoolConfig()
            {
                ProducersPerConnection = producersPerConnection,
                ConsumersPerConnection = consumersPerConnection,
            }
        });

        var streamsList = new List<string>();
        for (var i = 0; i < streams; i++)
        {
            streamsList.Add($"BenchmarkDotNet{i}");
        }

        var totalConfirmed = 0;
        var totalError = 0;
        var totalConsumed = 0;
        var totalSent = 0;
        var isRunning = true;

        _ = Task.Run(() =>
        {
            while (isRunning)
            {
                Console.WriteLine(
                    $"Conf: {totalConfirmed}, " +
                    $"Error: {totalError}, " +
                    $"Total: {totalConfirmed + totalError}, " +
                    $"Consumed: {totalConsumed}, " +
                    $"Sent: {totalSent}, " +
                    $"Sent per stream: {totalSent / streamsList.Count}");
                Thread.Sleep(1000);
            }
        });
        List<Consumer> consumersList = new();
        List<Producer> producersList = new();
        var obj = new object();
        foreach (var stream in streamsList)
        {
            if (await system.StreamExists(stream))
            {
                await system.DeleteStream(stream);
            }

            await system.CreateStream(new StreamSpec(stream)
            {
                MaxLengthBytes = 20_000_000_000,
            });

            for (var z = 0; z < consumers; z++)
            {
                consumersList.Add(await Consumer.Create(new ConsumerConfig(system, stream)
                {
                    OffsetSpec = new OffsetTypeFirst(),
                    MessageHandler = (source, ctx, _, _) =>
                    {
                        Interlocked.Increment(ref totalConsumed);
                        return Task.CompletedTask;
                    },
                }));
            }

            for (var z = 0; z < producers; z++)
            {
                _ = Task.Run(async () =>
                {
                    var producer = await Producer.Create(new ProducerConfig(system, stream)
                    {
                        ConfirmationHandler = confirmation =>
                        {
                            if (confirmation.Status != ConfirmationStatus.Confirmed)
                            {
                                Interlocked.Increment(ref totalError);
                                return Task.CompletedTask;
                            }

                            Interlocked.Increment(ref totalConfirmed);
                            return Task.CompletedTask;
                        },
                    });
                    lock (obj)
                    {
                        producersList.Add(producer);
                    }

                    for (var i = 0; i < messagesPerProducer; i++)
                    {
                        var m = new Message(Encoding.UTF8.GetBytes($"Hello World! {i}")); // 1 sezione 
                        await producer.Send(m);
                        await Task.Delay(10);
                        Interlocked.Increment(ref totalSent);
                    }
                });
            }
        }

        Console.WriteLine("Press any key to close all the consumers");
        Console.ReadKey();
        isRunning = false;
        Console.WriteLine("closing the producers ..... ");
        producersList.ForEach(async p => await p.Close());
        Console.WriteLine("closing the consumers ..... ");
        consumersList.ForEach(async c => await c.Close());
        Console.WriteLine("Closed all the consumers and producers");
    }
}

I used make rabbitmq-ha-proxy from the golang repo to get a cluster up and running

Note:

We turned off the test TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted because it requires changing Action<MetaDataUpdate> MetadataHandler to an Event to support multiple handlers. I would prefer to have it in another PR.

codecov[bot] commented 1 year ago

Codecov Report

Attention: 34 lines in your changes are missing coverage. Please review.

Comparison is base (90a6752) 92.60% compared to head (c7c66a5) 92.94%. Report is 2 commits behind head on main.

Files Patch % Lines
RabbitMQ.Stream.Client/AbstractEntity.cs 46.15% 11 Missing and 3 partials :warning:
RabbitMQ.Stream.Client/ConnectionsPool.cs 91.52% 1 Missing and 9 partials :warning:
RabbitMQ.Stream.Client/RawConsumer.cs 88.88% 0 Missing and 3 partials :warning:
RabbitMQ.Stream.Client/Client.cs 97.26% 0 Missing and 2 partials :warning:
RabbitMQ.Stream.Client/StreamSystem.cs 90.47% 0 Missing and 2 partials :warning:
RabbitMQ.Stream.Client/MetaData.cs 66.66% 0 Missing and 1 partial :warning:
RabbitMQ.Stream.Client/RawProducer.cs 95.65% 0 Missing and 1 partial :warning:
Tests/SuperStreamConsumerTests.cs 95.00% 1 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #328 +/- ## ========================================== + Coverage 92.60% 92.94% +0.34% ========================================== Files 113 115 +2 Lines 9964 10701 +737 Branches 825 863 +38 ========================================== + Hits 9227 9946 +719 - Misses 560 567 +7 - Partials 177 188 +11 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

Gsantomaggio commented 12 months ago

The issue was raised by @jonnepmyra with a specific use case

With the stable implementation, we have:

| Method                 | Mean      | Error     | StdDev    | Ratio | RatioSD |
|----------------------- |----------:|----------:|----------:|------:|--------:|
| AmqpStreamConsumer     |  10.39 ms |  1.454 ms |  4.287 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 102.34 ms | 18.856 ms | 55.597 ms | 10.41 |    5.89 ||

107 ms per StreamProtocol 10.39 ms Consumer vs Amqp StreamConsumer

With this PR:

| Method                 | Mean     | Error    | StdDev    | Median   | Ratio | RatioSD |
|----------------------- |---------:|---------:|----------:|---------:|------:|--------:|
| AmqpStreamConsumer     | 12.95 ms | 0.993 ms |  2.928 ms | 12.28 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 28.29 ms | 5.076 ms | 14.967 ms | 24.05 ms |  2.31 |    1.38 |

107 ms per StreamProtocol 28.29 ms Consumer vs 12.95 Amqp StreamConsumer

PoolConfiguration:

ConnectionPoolConfig = new ConnectionPoolConfig()
{
  ConsumersPerConnection = 20,
  ProducersPerConnection = 1,
}

@jonnepmyra FYI: By reducing the InitialCredits to 1 you can still improve:

| Method                 | Mean     | Error    | StdDev   | Ratio | RatioSD |
|----------------------- |---------:|---------:|---------:|------:|--------:|
| AmqpStreamConsumer     | 10.70 ms | 1.334 ms | 3.932 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 19.60 ms | 3.127 ms | 9.219 ms |  1.95 |    1.07 |

PoolConfiguration:

ConnectionPoolConfig = new ConnectionPoolConfig()
{
  ConsumersPerConnection = 20,
  ProducersPerConnection = 1,
}

Consumer:

var config = new RawConsumerConfig(_streamName)
        {
            ClientProvidedName = "STREAM-CONSUMER",
            OffsetSpec = new OffsetTypeOffset(_offset),
            InitialCredits = 1,
            MessageHandler = async (consumer, context, arg3) => { tcs.TrySetResult(); }
        };
jonnepmyra commented 12 months ago

Great! I'll aim to test it tomorrow when I have some time. The improvements in my benchmarks seem even more significant in real-world scenarios, beyond the isolated benchmarks.

This modification should significantly enhance performance for cases involving time-traveling streams, especially when dealing with a small number of messages. In such scenarios, a substantial portion of time is typically consumed in establishing the connection.

Appreciate your contribution, @Gsantomaggio!

jonnepmyra commented 11 months ago

I've tested this PR in both our integration test suites and real-workload scenarios, and it's performing really well, especially since 15fe9dc1098953696f4974e532f089a0f715b909.

Notably, the creation of consumers has shown a significant improvement, approximately 4-5 times faster than before. This enhancement is observed as long as the connection pool is not drained of active consumers.

A note, even though it falls outside the scope of this PR. Currently, the underlying connection of the "pool" is closed when there are no active consumers, resulting in a slowdown in creating new consumers. It would be beneficial if we could maintain the underlying connection open, even in the absence of consumers. This functionality aligns with our experience with an AMQP connection that can exist without any active channels or consumers, considerably enhancing the efficiency of consumer creation from a time perspective.

Thanks for fixing this issue!

Gsantomaggio commented 11 months ago

I am going to merge the PR. It requires more job to handle the metadata update but I prefer to open another PR