Open lhotari opened 3 weeks ago
@lhotari I tried to implement this: https://github.com/phil-cd/pulsar/commit/e241cff3aa6f0273138b05eb61bac14e0153a7d6 When the policy of a new consumer is incompatible with the existing consumers of the subscription it is rejected.
I also wrote a test but it works only when disabling the transaction coordinator in the setup()
method. I could not find out how to make the test work with the transaction coordinator being enabled. When calling addConsumer()
on the subscription the pendingAckHandleFuture
seems to block.
Search before asking
Read release policy
Version
all released versions including master branch
Minimal reproduce step
The conclusion is currently based on the source code: https://github.com/apache/pulsar/blob/766d2a407196533832184447c25498c6a82f7a86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L287-L309
What did you expect to see?
When multiple consumers are using different policies, this should be properly handled. One possibility is to keep the policy of the connected consumers and reject any other consumers and return a proper error message.
What did you see instead?
Based on the source code, it looks like the solution cannot work:
Anything else?
No response
Are you willing to submit a PR?