Closed mnorkin closed 5 years ago
I don't like it. This can cause many identical updateMetadata
calls running for the same topic. I think we should do updateMetadata()
(for all Kafka server topics) in Client.init()
and leave current behaviour untouched. In a very rare case that during runtime the publisher would publish into several new topics any of the failed payloads will be retried and succeed (however there is a bug that will prevent this, the fix is coming).
Ok, we'll wait for your fix, thanks
I see that we're now back to square one https://github.com/oleksiyk/kafka/commit/13abf50d1817d8cbd6299c82e5730a2bad030f0b#diff-50cfa59973c04321b5da0c6da0fdf4feR138
In this case we cannot use your library any more, sorry.
Nothing to be sorry about, I'm not selling it. However I'm not sure what is the problem with fetching full metadata once only?
I have a story to tell.
In production we have 100k topics. Before app subscribes to topic, it reads all the metadata and then synchronously loops over received data. Which in our case just blocks everything.
For the single instance app, which is being initialized it's bad, because for a time our app basically down. For multi instance app, one can think that it's not so bad, because while one app fetches data, others are processing the requests. However, we discovered, that then consumer restarts, topology changes, so metadata should be re-read again. Due to this reason, all of app instances needs to re-fetch and synchronously re-loop everything again. In our case it makes our app unavailable for 6 minutes, which is not acceptable for us.
Well it is not my business but you definitely did that wrong with 100k topics. I can only imagine how overloaded your Zookeeper instances are. You should have been using GroupConsumer with custom assignment strategy and matching custom partitioner in Producer to split the load. In my current project Kafka cluster handles roughly 150,000,000 messages daily with just a few topics and load-balanced group consumers. Works perfectly.
Thanks for your time
@oleksiyk we've investigated and found how it works in official clients. Where is needMetadataForAllTopics
property in the client settings which is false by default (https://github.com/apache/kafka/blob/d71cb54672e63d2f0f4b999668336642a9a63a1d/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L111) and becomes true
only in case the subscription is by pattern matching (https://github.com/apache/kafka/blob/c3e7d6252c41d48e74379810595c978efada9efb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L997).
So it's how kafka client should behave by default.
I've checked my changes in production and it works amazing now - less memory usage, a lot less cpu usage.
Currently the client does not support multiple publishers from the same instance.
This is due to single promise on updateMetadata.
If two publishers publish on the same time, on initialization they both have no information in
self.topicMetadata
, soupdateMetadata
happens and the first publish will acquire the request, while the second will wait for the promise to resolve, thinking it's his promise. First publisher then receives promise resolve and tries to search inself.topicMetadata
his topic and finds it. Second publisher also receives same promise, tries to search inself.topicMetadata
his topic and fails, because request was for the first publisher, not second.This change introduces new promise for every updateMetadata request, then topics are involved. So no race condition happens. This race condition appeared in one of your tests.
@oleksiyk what do you think?