fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

feat: dlq initial subscription support #273

Closed adrianotr closed 1 month ago

adrianotr commented 2 months ago

Adds support to https://github.com/apache/pulsar/issues/13408

Lanayx commented 2 months ago

@adrianotr Thanks for the updates, can you please also merge in latest develop with CI fix

Lanayx commented 2 months ago

@adrianotr Is it possible instead of using admin api actually in integration test actually send some messages to DLQ, then connect with consumer and verify that messages are indeed there? This will make implementation more robust and serve as example of how to use it in real life. Also, right now checks fails anyway

adrianotr commented 2 months ago

@adrianotr Is it possible instead of using admin api actually in integration test actually send some messages to DLQ, then connect with consumer and verify that messages are indeed there? This will make implementation more robust and serve as example of how to use it in real life. Also, right now checks fails anyway

It works if the Task.Delay amount is high enough, which I agree is pretty unreliable. Unfortunately the dlq publishing seems to happen asynchronously and we don't have an easy way to wait for it. Maybe a better approach would be to try getting the subscription stats in a loop and fail after a few attempts?

My understanding is that without the admin api the test results would not be reliable since there's no guarantees that messages will be dropped if there's no subscriptions. Also, the problem above also applies.

Lanayx commented 2 months ago

My understanding is that without the admin api the test results would not be reliable since there's no guarantees that messages will be dropped if there's no subscriptions.

Isn't it the whole idea of this particular feature - to create a special subscription for DLQ to prevent messages drop?

adrianotr commented 2 months ago

My understanding is that without the admin api the test results would not be reliable since there's no guarantees that messages will be dropped if there's no subscriptions.

Isn't it the whole idea of this particular feature - to create a special subscription for DLQ to prevent messages drop?

Yes. However, when there isn't any subscriptions, I don't know if messages are 100% guaranteed to be dropped right away. I know they will be eventually dropped. Thus testing it based on the assumption that messages are always dropped immediately when there's no subscriptions might not be correct.

adrianotr commented 2 months ago

So, I did some changes here to address both issues.

First, to ensure the messages already landed in the DLQ, without any sort of Task.Delay, I'm subscribing to the DLQ with a different subscription (not the initial one) https://github.com/fsprojects/pulsar-client-dotnet/blob/3e23b7e629440be9ef4d898065698cf7d0e28236/tests/IntegrationTests/DeadLetters.fs#L503-L507

Finally, when we know for sure all messages are in the DLQ, we can subscribe to the initial subscription with SubscriptionInitialPosition.Latest. That means if the initial subscription did not exist it would not have the messages that were sent earlier. https://github.com/fsprojects/pulsar-client-dotnet/blob/3e23b7e629440be9ef4d898065698cf7d0e28236/tests/IntegrationTests/DeadLetters.fs#L525

Edit: changing the line bellow to let config = getTestConfig() confirms that the test indeed fails if there was no initial subscription https://github.com/fsprojects/pulsar-client-dotnet/blob/3e23b7e629440be9ef4d898065698cf7d0e28236/tests/IntegrationTests/DeadLetters.fs#L458

Lanayx commented 1 month ago

Yes. However, when there isn't any subscriptions, I don't know if messages are 100% guaranteed to be dropped right away. I know they will be eventually dropped.

You can test it, but as far as I remember they will be dropped 100% if there are no active subscriptions and no retention on namespace level is set.

Thus testing it based on the assumption that messages are always dropped immediately when there's no subscriptions might not be correct.

With the new feature we only need to test that messages are always not dropped without creating additional consumer beforehand, so whether they are dropped without it doesn't matter for this particular case.

First, to ensure the messages already landed in the DLQ, without any sort of Task.Delay, I'm subscribing to the DLQ with a different subscription (not the initial one)

This goes against the purpose of the feature, because the whole idea is to make sure messages are not dropped even without creating additional subscription beforehand, so it shouldn't be included in integration test. Task.Delay is required for a different reason - because consumer itself doesn't send messages to DLQ (and producer with initial subscription is not created) until NegativeAckRedeliveryDelay timer triggers. So it nack delay is set to 100ms, then doing task delay in test for 1 second before connecting with a new consumer seems like a fine solution for me.

UPDATE: Actually it sounds good to use 2 consumer for verification, just like you did - one with new subscription name (to verify that messages are in DLQ) and one with initialSubscription name and SubscriptionInitialPosition.Latest to verify that initialSubscription works as expected, but doing it after nacking all messages and Task.Delay(1000), not before

adrianotr commented 1 month ago

Hi, can you please check the last commit?

UPDATE: Actually it sounds good to use 2 consumer for verification, just like you did - one with new subscription name (to verify that messages are in DLQ) and one with initialSubscription name and SubscriptionInitialPosition.Latest to verify that initialSubscription works as expected, but doing it after nacking all messages and Task.Delay(1000), not before.

So, the whole purpose of having two dlq consumers was to make sure the messages were persisted in the dlq without having to await the nacks with a hardcoded delay. Anyway, I changed it back to using a one second delay instead of using two consumers.

adrianotr commented 1 month ago

It seems the CI is failing waiting for the dlq messages. 100% of my local test runs work with the 1 second delay, but this is a beefier machine compared to the CI. If I lower that delay it fails sometimes. That was the reason I wanted to avoid using a delay, although I have no way to confirm if that is actual issue happening in the CI.

Lanayx commented 1 month ago

Do you know which server version is required for this feature? I suspect that it's 2.10.0 while CI is running on 2.9.5

Lanayx commented 1 month ago

Ok, I've tested it on 2.11.4 locally and it worked, so I'm merging it in. However I had to disable this test until somebody updates CI to 2.11.4 (it looks like there were some breaking changes in 2.10.0)