fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

Problem disposing consumer of partitioned topic #225

Closed jvstein closed 1 year ago

jvstein commented 1 year ago

I'm seeing an exception with the CancellationTokenSource when trying to dispose a partitioned consumer.

When this happens, it seems to deadlock my process. I assume this is because of the sub-consumers for each partition.

I'm just doing a await consumer.DisposeAsync(); with nothing special. My topic has 10 partitions and I can reproduce this problem against an empty queue. The cancellation needs to be triggered after the consumer gets to the mt/consumer(0, topic) created log message to reproduce.

System.InvalidOperationException: An attempt was made to transition a task to a final state when it had al
ready completed.
   at System.Threading.Tasks.TaskCompletionSource`1.SetException(Exception exception)
   at Pulsar.Client.Api.MultiTopicsConsumerImpl`1.stopConsumer()
   at <StartupCode$Pulsar-Client>.$MultiTopicsConsumerImpl.-ctor@606-98.MoveNext()
2023-06-29 12:57:07 "HOSTNAME" [INF] [PulsarLogger] [{}] consumer(1, topic, 0) mailbox has stopped normal
ly
2023-06-29 12:57:07 "HOSTNAME" [FTL] [PulsarLogger] [{}] mt/consumer(0, topic) mailbox failure
System.ObjectDisposedException: The CancellationTokenSource has been disposed.
   at System.Threading.CancellationTokenSource.ThrowObjectDisposedException()
   at Pulsar.Client.Api.MultiTopicsConsumerImpl`1.stopConsumer()
   at <StartupCode$Pulsar-Client>.$MultiTopicsConsumerImpl.-ctor@606-98.MoveNext()
Lanayx commented 1 year ago

Hi, I've tried to reproduce it and couldn't. Could you please add a repro or even a failing test? Also please specify library version

jvstein commented 1 year ago

I'm able to reproduce with this test program with version 2.12.2.

bug225.csproj ```xml 10.0 true Exe bug225 net48 Bug225 disable x64 true ```
Program.cs ```csharp using System; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Pulsar.Client.Api; using Pulsar.Client.Common; namespace Bug225 { internal static class Program { private async static Task Main(string[] args) { using ILoggerFactory loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger("Bug225"); // Start cancellation upon CTRL-C. using var cts = new CancellationTokenSource(); Console.CancelKeyPress += async (s, e) => { cts.Cancel(); e.Cancel = true; // Exit automatically after program deadlocks. await Task.Delay(10 * 1000); logger.LogError("Forcing shutdown after 10 seconds."); Environment.Exit(1); }; PulsarClient.Logger = loggerFactory.CreateLogger("PulsarLogger"); var client = await new PulsarClientBuilder() .ServiceUrl("pulsar://localhost:6650") .BuildAsync(); await using var consumer = await client .NewConsumer(Schema.STRING()) .Topic("persistent://public/default/my-topic") .ConsumerName("my-consumer") .ReceiverQueueSize(10) .SubscriptionName("my-subscription") .SubscriptionMode(SubscriptionMode.Durable) .SubscriptionType(SubscriptionType.Shared) .SubscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .SubscribeAsync(); while (!cts.Token.IsCancellationRequested) { var message = await consumer.ReceiveAsync(cts.Token); Console.WriteLine(message); await consumer.AcknowledgeAsync(message.MessageId); } return 0; } } } ```

I'm using a docker based cluster to test. The initialization looks like this:

$ docker run -d -it \
    -p 6650:6650 \
    -p 8080:8080 \
    -v pulsardata:/pulsar/data \
    -v pulsarconf:/pulsar/conf \
    --name pulsar-standalone \
    apachepulsar/pulsar:latest \
    bin/pulsar standalone

The topic is created using this command:

docker exec -it pulsar-standalone bin/pulsar-admin topics create-partitioned-topic persistent://public/default/my-topic --partitions 10
  1. dotnet restore
  2. dotnet build
  3. mono ./bin/Debug/net48/bug225.exe (on Linux)
  4. Press CTRL-C to initiate the cancellation token.
output.txt ``` $ mono ./bin/Debug/net48/bug225.exe info: PulsarLogger[0] Connecting to Broker (LogicalAddres Unspecified/localhost:6650, PhysicalAddress Unspecified/localhost:6650) with maxMessageSize: 5242880, brokerless: True info: PulsarLogger[0] Connected to Broker (LogicalAddres Unspecified/localhost:6650, PhysicalAddress Unspecified/localhost:6650) info: PulsarLogger[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Connected ProtocolVersion: 19 ServerVersion: Pulsar Server3.0.0 MaxMessageSize: 5242880 Brokerless: True info: PulsarLogger[0] Connecting to Broker (LogicalAddres Unspecified/localhost:6650, PhysicalAddress Unspecified/localhost:6650) with maxMessageSize: 5242880, brokerless: False info: PulsarLogger[0] Connected to Broker (LogicalAddres Unspecified/localhost:6650, PhysicalAddress Unspecified/localhost:6650) info: PulsarLogger[0] clientCnx(1, LogicalAddres Unspecified/localhost:6650) Connected ProtocolVersion: 19 ServerVersion: Pulsar Server3.0.0 MaxMessageSize: 5242880 Brokerless: False info: PulsarLogger[0] consumer(1, my-consumer, 0) starting subscribe to topic persistent://public/default/my-topic-partition-0 info: PulsarLogger[0] consumer(2, my-consumer, 1) starting subscribe to topic persistent://public/default/my-topic-partition-1 info: PulsarLogger[0] consumer(3, my-consumer, 2) starting subscribe to topic persistent://public/default/my-topic-partition-2 info: PulsarLogger[0] consumer(4, my-consumer, 3) starting subscribe to topic persistent://public/default/my-topic-partition-3 info: PulsarLogger[0] consumer(5, my-consumer, 4) starting subscribe to topic persistent://public/default/my-topic-partition-4 info: PulsarLogger[0] consumer(6, my-consumer, 5) starting subscribe to topic persistent://public/default/my-topic-partition-5 info: PulsarLogger[0] consumer(7, my-consumer, 6) starting subscribe to topic persistent://public/default/my-topic-partition-6 info: PulsarLogger[0] consumer(8, my-consumer, 7) starting subscribe to topic persistent://public/default/my-topic-partition-7 info: PulsarLogger[0] consumer(9, my-consumer, 8) starting subscribe to topic persistent://public/default/my-topic-partition-8 info: PulsarLogger[0] consumer(10, my-consumer, 9) starting subscribe to topic persistent://public/default/my-topic-partition-9 info: PulsarLogger[0] consumer(1, my-consumer, 0) subscribed to topic persistent://public/default/my-topic-partition-0 info: PulsarLogger[0] consumer(3, my-consumer, 2) subscribed to topic persistent://public/default/my-topic-partition-2 info: PulsarLogger[0] consumer(5, my-consumer, 4) subscribed to topic persistent://public/default/my-topic-partition-4 info: PulsarLogger[0] consumer(2, my-consumer, 1) subscribed to topic persistent://public/default/my-topic-partition-1 info: PulsarLogger[0] consumer(6, my-consumer, 5) subscribed to topic persistent://public/default/my-topic-partition-5 info: PulsarLogger[0] consumer(4, my-consumer, 3) subscribed to topic persistent://public/default/my-topic-partition-3 info: PulsarLogger[0] consumer(7, my-consumer, 6) subscribed to topic persistent://public/default/my-topic-partition-6 info: PulsarLogger[0] consumer(9, my-consumer, 8) subscribed to topic persistent://public/default/my-topic-partition-8 info: PulsarLogger[0] consumer(8, my-consumer, 7) subscribed to topic persistent://public/default/my-topic-partition-7 info: PulsarLogger[0] consumer(10, my-consumer, 9) subscribed to topic persistent://public/default/my-topic-partition-9 info: PulsarLogger[0] mt/consumer(0, my-consumer) created info: PulsarLogger[0] consumer(1, my-consumer, 0) starting close info: PulsarLogger[0] consumer(2, my-consumer, 1) starting close info: PulsarLogger[0] consumer(3, my-consumer, 2) starting close info: PulsarLogger[0] consumer(4, my-consumer, 3) starting close info: PulsarLogger[0] consumer(5, my-consumer, 4) starting close info: PulsarLogger[0] consumer(6, my-consumer, 5) starting close info: PulsarLogger[0] consumer(7, my-consumer, 6) starting close info: PulsarLogger[0] consumer(8, my-consumer, 7) starting close info: PulsarLogger[0] consumer(9, my-consumer, 8) starting close info: PulsarLogger[0] consumer(10, my-consumer, 9) starting close info: PulsarLogger[0] consumer(1, my-consumer, 0) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(1, my-consumer, 0) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(1, my-consumer, 0) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(1, my-consumer, 0) stopped info: PulsarLogger[0] consumer(2, my-consumer, 1) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(2, my-consumer, 1) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(2, my-consumer, 1) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(2, my-consumer, 1) stopped info: PulsarLogger[0] consumer(3, my-consumer, 2) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(3, my-consumer, 2) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(3, my-consumer, 2) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(3, my-consumer, 2) stopped info: PulsarLogger[0] consumer(4, my-consumer, 3) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(4, my-consumer, 3) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(4, my-consumer, 3) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(4, my-consumer, 3) stopped info: PulsarLogger[0] consumer(5, my-consumer, 4) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(5, my-consumer, 4) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(5, my-consumer, 4) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(5, my-consumer, 4) stopped info: PulsarLogger[0] consumer(6, my-consumer, 5) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(6, my-consumer, 5) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(6, my-consumer, 5) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(6, my-consumer, 5) stopped info: PulsarLogger[0] consumer(7, my-consumer, 6) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(4, my-consumer, 3) mailbox has stopped normally info: PulsarLogger[0] consumer(7, my-consumer, 6) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(1, my-consumer, 0) mailbox has stopped normally info: PulsarLogger[0] consumer(3, my-consumer, 2) mailbox has stopped normally info: PulsarLogger[0] consumer(5, my-consumer, 4) mailbox has stopped normally info: PulsarLogger[0] consumer(6, my-consumer, 5) mailbox has stopped normally info: PulsarLogger[0] consumer(7, my-consumer, 6) stopped info: PulsarLogger[0] consumer(2, my-consumer, 1) mailbox has stopped normally info: PulsarLogger[0] consumer(7, my-consumer, 6) mailbox has stopped normally info: PulsarLogger[0] consumer(8, my-consumer, 7) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(8, my-consumer, 7) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(8, my-consumer, 7) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(8, my-consumer, 7) stopped info: PulsarLogger[0] consumer(8, my-consumer, 7) mailbox has stopped normally info: PulsarLogger[0] consumer(9, my-consumer, 8) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(9, my-consumer, 8) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(9, my-consumer, 8) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(9, my-consumer, 8) stopped info: PulsarLogger[0] consumer(7, my-consumer, 6) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(9, my-consumer, 8) mailbox has stopped normally info: PulsarLogger[0] consumer(10, my-consumer, 9) NegativeTracker mailbox has stopped normally info: PulsarLogger[0] consumer(10, my-consumer, 9) GroupingTracker mailbox has stopped normally info: PulsarLogger[0] consumer(10, my-consumer, 9) ConnectionHandler ConnectionHandler mailbox has stopped normally info: PulsarLogger[0] consumer(10, my-consumer, 9) stopped fail: PulsarLogger[0] mt/consumer(0, my-consumer) could not close all child consumers properly System.InvalidOperationException at System.Threading.Tasks.TaskCompletionSource`1[TResult].SetException (System.Exception exception) [0x00013] in :0 at Pulsar.Client.Api.MultiTopicsConsumerImpl`1[T].stopConsumer () [0x0006d] in <9bb7f048a0727844fff78940832e9c89>:0 at .$MultiTopicsConsumerImpl+-ctor@606-98[T].MoveNext () [0x01d12] in <9bb7f048a0727844fff78940832e9c89>:0 info: PulsarLogger[0] consumer(10, my-consumer, 9) mailbox has stopped normally crit: PulsarLogger[0] mt/consumer(0, my-consumer) mailbox failure System.ObjectDisposedException: The CancellationTokenSource has been disposed. at System.Threading.CancellationTokenSource.Cancel (System.Boolean throwOnFirstException) [0x00000] in :0 at System.Threading.CancellationTokenSource.Cancel () [0x00000] in :0 at Pulsar.Client.Api.MultiTopicsConsumerImpl`1[T].stopConsumer () [0x0003e] in <9bb7f048a0727844fff78940832e9c89>:0 at .$MultiTopicsConsumerImpl+-ctor@606-98[T].MoveNext () [0x01dd1] in <9bb7f048a0727844fff78940832e9c89>:0 fail: Bug225[0] Forcing shutdown after 10 seconds. ```
Lanayx commented 1 year ago

@jvstein thank you for the repro, I haven't tried it myself yet, but wanted to ask - is this reproduced on other combinations (like .net48 on windows or net6 on linux)?

jvstein commented 1 year ago

I can reproduce against net6 and net7 on Linux. I haven't tried reproducing on net48 on Windows.

Lanayx commented 1 year ago

Ok, I was able to reproduce issue, will see what I can do

Lanayx commented 1 year ago

@jvstein I've just published 2.12.3 version, can you please check if it fixes your issue

jvstein commented 1 year ago

@Lanayx Just tested it out. It's working as expected now. Thanks!

Lanayx commented 1 year ago

Ok, closing it then