fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
303 stars 48 forks source link

Consumer not reading messages when batching is disabled #271

Open chriscameron-vertexinc opened 3 months ago

chriscameron-vertexinc commented 3 months ago

Hello!

I'm using a Key_Shared topic to ensure no two consumers receive messages with the same key.

To avoid collisions when scaling consumers up/down I'm trying to set my receiver queue size to 0. I don't want to have any prefetch messages, I'd like to make sure I'm only ever consuming one message at a time.

When I produce messages to my topic the consumers throw the following exceptions:

[20:18:24 Error]
consumer(0, b4e1c, -1) Closing consumer due to unsupported received batch-message with zero receiver queue size
[20:18:24 Error]
consumer(0, b4e1c, -1) Batch reading exception 42:0:-1
Pulsar.Client.Api.InvalidMessageException: Unsupported Batch message with 0 size receiver queue for [CNR.ExampleSubscriber.API]-[]
   at Pulsar.Client.Api.ZeroQueueConsumerImpl`1.ReceiveIndividualMessagesFromBatch(RawMessage _arg2, FSharpFunc`2 _arg1) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1699
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.clo@727-101.Invoke(Unit unitVar0) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 728
   at Pulsar.Client.Common.Tools.wrapException[a](FSharpFunc`2 f) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 171
[20:18:25 Information]
consumer(0, b4e1c, -1) Message 42:0:-1 was discarded due to BatchDeSerializeError
[20:18:25 Information]
consumer(0, b4e1c, -1) starting close
[20:18:25 Information]
consumer(0, b4e1c, -1) UnackedTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) GroupingTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) NegativeTracker mailbox has stopped normally
[20:18:25 Information]
consumer(0, b4e1c, -1) ConnectionHandler ConnectionHandler mailbox has stopped normally
[20:18:25 Error]
Failed to receive message
Pulsar.Client.Api.AlreadyClosedException: Consumer is already closed
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.ReceiveWrappedAsync@1437.MoveNext() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1439
   at Pulsar.Client.Common.Tools.reraize[a](Exception ex) in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Common\Tools.fs:line 67
   at <StartupCode$Pulsar-Client>.$ConsumerImpl.Pulsar-Client-Api-IConsumer<'T>-ReceiveAsync@1707-3.MoveNext() in C:\Workspace\pulsar-client-dotnet\src\Pulsar.Client\Internal\ConsumerImpl.fs:line 1712
   at CNR.Common.Eventing.Pulsar.Subscribers.PulsarEventSubscriber`1.ExecuteAsync(CancellationToken stoppingToken) in C:\Workspace\cnr-common-eventing\src\CNR.Common.Eventing.Pulsar\Subscribers\PulsarEventSubscriber.cs:line 76
[20:18:25 Information]
consumer(0, b4e1c, -1) stopped
[20:18:25 Information]
consumer(0, b4e1c, -1) mailbox has stopped normally

Ok, so we can't read batched messages when only handling 1 message at a time.

When I disable batching on the producer side my consumer no-longer receives any messages at all. When I check pulsar-admin I can see my msgInCounter incrementing, but msgOutCounter never goes up.

As a workaround I've had to enable batching on the producer, and set the receive queue size to 1. Is this going to be adequate for my use case?

Lanayx commented 3 months ago

Reading messages using zero queue is a dedicated supported case (although batched messages are not supported following Java client). Pulsar.Client even has a set of tests specially for this type of consumer. You are welcome to create a PR with a failing test to fix.