rabbitmq / rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html
Other
122 stars 41 forks source link

Consumer timeout and pending connection #310

Closed Gsantomaggio closed 1 year ago

Gsantomaggio commented 1 year ago

Describe the bug

When the consumer.close() goes in timeout, the client does not close the connection in the right way. the consumer is removed from the list but the connection is still pending

Reproduction steps

this code sometimes raise the connection timeout:

public static async Task Start()
    {
        var serviceCollection = new ServiceCollection();
        serviceCollection.AddLogging(builder => builder
            .AddSimpleConsole(options =>
            {
                options.IncludeScopes = true;
                options.SingleLine = true;
                options.TimestampFormat = "[HH:mm:ss] ";
                options.ColorBehavior = LoggerColorBehavior.Default;
            })
            .AddFilter(level => level >= LogLevel.Debug)
        );
        var loggerFactory = serviceCollection.BuildServiceProvider()
            .GetService<ILoggerFactory>();

        var consumerLogger = loggerFactory.CreateLogger<Consumer>();
        var streamLogger = loggerFactory.CreateLogger<StreamSystem>();

        var workOnStreamIsComplete = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
        var closeConsumerTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

        var system = await StreamSystem.Create(new StreamSystemConfig()
        {
          });
  //      await system.DeleteStream("StreamSpec");
        await system.CreateStream(new StreamSpec("StreamSpec"));
        var producer = await Producer.Create(new ProducerConfig(system, "StreamSpec"));

        _ = Task.Run(async () =>
        {
            for (var i = 0; i < 1000000; i++)
            {
                await producer.Send(new Message(new byte[1024 * 2])
                {
                    ApplicationProperties = new ApplicationProperties()
                    {
                        {"key", "value"},
                        {"key1", "value1"},
                        {"key2", "value2"},
                    }
                });
            }

            streamLogger.LogInformation("Sent 1000000 messages to the stream");
        });

        var consumed = 0;
        var consumer = await Consumer.Create(new ConsumerConfig(system, "StreamSpec")
        {
            OffsetSpec = new OffsetTypeFirst(),
            InitialCredits = 1000,
            MessageHandler = async (s, rawConsumer, arg3, arg4) =>
            {
                if (!workOnStreamIsComplete.Task.IsCompleted)
                {
                    consumed++;
                    if (consumed % 4000 == 0)
                    {
                        streamLogger.LogInformation("Received {Consumed} messages from the stream", consumed);
                    }
                }
                else
                {
                    await closeConsumerTcs.Task;
                }
            },
        }, consumerLogger);

/       await Task.Delay(1_000);

        _ = Task.Run(async () =>
        {
            //The reason to Close() the consumer in a non awaited Task.Run() is
            //because the Close() sometimes timeout, and we have to wait for a long
            //time before it actually times out..

            await consumer.Close();
            streamLogger.LogInformation("Closed consumer");
            closeConsumerTcs.TrySetResult();

        });

        await workOnStreamIsComplete.Task;
    }

The problem is this WaitAsync:_client.Unsubscribe(_subscriberId).WaitAsync(Consts.MidWait) that does not remove the the consumer from the consumer list in time to be removed

Expected behavior

Close the connection

Additional context

No response