Here is a simple repro. I am using package version 2.8.0.
private static async Task TestPulsarContainer(string connectionString)
{
string topicName = "testTopic";
PulsarClient pulsarClient = await new PulsarClientBuilder().ServiceUrl(connectionString).BuildAsync();
IProducer<byte[]> producer = await pulsarClient.NewProducer().Topic(topicName).CreateAsync();
IConsumer<byte[]> consumer = await pulsarClient.NewConsumer().Topic(topicName).SubscriptionName("testSubscription").SubscribeAsync();
string testMessage = "Test message";
await producer.SendAsync(Encoding.UTF8.GetBytes(testMessage));
// Terminate topic with rest api
Message<byte[]> message = await consumer.ReceiveAsync();
await consumer.AcknowledgeAsync(message.MessageId);
// This call should not block because topic is terminated and no other messages are in the queue.
await consumer.ReceiveAsync();
// This should be true because topic is terminated and no other messages are in the queue.
Assert.IsTrue(consumer.HasReachedEndOfTopic);
}
When I terminate a topic from the REST API on a standalone Pulsar container (2.8.0), calling
consumer.ReceiveAsynce()
does not have the expected behavior. I expect (this line of code)[https://github.com/fsharplang-ru/pulsar-client-dotnet/blob/develop/src/Pulsar.Client/Internal/ConsumerImpl.fs#L1079] to be hit to end the wait on the receive. Instead, it blocks forever.Am I doing something wrong or missing something?
Here is a simple repro. I am using package version 2.8.0.