apache / pulsar-dotpulsar

The official .NET client library for Apache Pulsar
https://pulsar.apache.org/
Apache License 2.0
234 stars 62 forks source link

Multi threaded message consumer issue #211

Closed savan-rangrej closed 7 months ago

savan-rangrej commented 7 months ago

Description

Hello Everyone, Lets say, If I have 10 parallel threads and trying to receive the messages in parallel, I'm getting index out of range exception.

Please find the below code, I have created sample console program to reproduce the issue.

System.IndexOutOfRangeException: Index was outside the bounds of the array. at DotPulsar.Internal.Consumer1.Receive(CancellationToken cancellationToken) at DotPulsar.Internal.Consumer1.d__34.MoveNext()

Reproduction Steps

internal class Program
{
    private static ConcurrentQueue<ValueTask<IMessage<ReadOnlySequence<byte>>>> _messageConcurrentQueue = new();
    static async Task Main(string[] args)
    {
        var client = PulsarClient.Builder().ServiceUrl(new Uri("pulsar://X.X.X.X:6650/")).Build();
        string topic = $"persistent://Mytenant/MySubscription/MyTopic";
        var consumer = client.NewConsumer()
            .Topic(topic)
            .SubscriptionName("AutomationSubscription")
            .SubscriptionType(SubscriptionType.Shared)
            .Create();

        while (true)
        {
            if (_messageConcurrentQueue.Count() < 10)
            {
                _messageConcurrentQueue.Enqueue(consumer.Receive());
            }
            if (_messageConcurrentQueue.Count() > 0)
            {
                _messageConcurrentQueue.TryDequeue(out var MessageResult);
                if (MessageResult.IsCompleted)
                {
                    if (!MessageResult.IsCanceled && !MessageResult.IsFaulted)
                    {
                        var Result = MessageResult.Result;
                        if (Result != null && Result.Data.Length > 0)
                        {
                            await consumer.Acknowledge(Result);
                            System.Console.WriteLine( Encoding.UTF8.GetString(Result.Data.ToArray()));
                        }
                    }
                    else
                    {
                        if (MessageResult.IsFaulted)
                        {
                            System.Console.WriteLine(MessageResult.AsTask().Exception?.InnerException);
                        }
                    }
                }
                else
                    _messageConcurrentQueue.Enqueue(MessageResult);
            }
        }
    }
}

Expected behavior

I should be able to receive multiple messages in parallel.

Actual behavior

I'm getting System.IndexOutOfRangeException: Index was outside the bounds of the array.

Regression?

It is reproducible in 6.0 and 8.0.

Known Workarounds

No response

Configuration

No response

Other information

No response

entvex commented 7 months ago

Dear @savan-rangrej. Thanks for the bug report! When you run into this issue, are you using a partitioned topic?

entvex commented 7 months ago

Dear @savan-rangrej. I have verified the issue and are working on a solution 👍