Open oded-zahavi opened 5 years ago
I did see some cases where Kafka returns empty metadata. Does it actually stop in your case, does waiting for metadata_max_age_ms help?
Hi,
In my understanding metadata_max_age_ms
determines the interval to refresh metadata from Kafka (default 5 minutes). This is not what I'm facing. Let me further elaborate:
I'm using a setup of Kafka and Zookeeper containers running in docker (Pretty standard so far). From one client I'm publishing messages and from another client I'm consuming the topic.
At a certain point in time I'm restarting Kafka container to simulate Kafka cluster failure. Both consumer and producer are waiting for the container to come up and connection to be established. However, whereas producer recovers and continues publishing messages to the topic, consumer fails to reconnect.
After further analysis I figure that this probably happens because when Kafka comes up, it first queries Zookeeper for metadata, but in the meantime it allows new connections to be established. So aiokafka connects, queries Kafka for metadata and gets empty topic list (which leads to a failure). By adding sleep
before querying Kafka for metadata, we allow Kafka to finish querying Zookeeper and have data available for aiokafka. However, just adding sleep
here is not deterministic, and we are looking for a more robust solution here.
Thanks
Any thoughts @tvoinarovskyi ?
@tvoinarovskyi ⬆️
Sorry for not getting back to you 😭 I fail to reproduce it locally on my Mac book, could not figure it out before as per lack of time (maybe my PC is slower to respond...). If possible to get any kind of reproducible snippet it would help a bunch.
Yes, you are correct, Kafka does actually respond with empty metadata at the start, which is why we have this code https://github.com/aio-libs/aiokafka/blob/master/aiokafka/client.py#L213. The problem is we only do that on bootstrap, not on metadata updates. We only log warning https://github.com/aio-libs/aiokafka/blob/master/aiokafka/cluster.py#L43.
What I was asking is if waiting for 5mins (default metadata update period) actually helps the consumer to recover.
So aiokafka connects, queries Kafka for metadata and gets empty topic list (which leads to a failure).
What failure do you see? Just everything stops?
Hmm, I looked at https://github.com/dpkp/kafka-python/blob/master/kafka/cluster.py#L244 it seems like we made a similar thing here https://github.com/aio-libs/aiokafka/blob/master/aiokafka/client.py#L304. But what we don't do it setting a proper timer for the retry. If we know that the response was invalid, we should retry after backoff, not after another max_metadata_age_ms.
Thus a proper fix would be to look at ret
here https://github.com/aio-libs/aiokafka/blob/master/aiokafka/client.py#L249 and schedule a proper timeout for next retry. Not sure if that is the best solution, may start spamming the instance with metadata requests, but if we do a backoff we will be fine.
Hi @tvoinarovskyi any update on this? Should I open a pull request?
@oded-zahavi If you can that would be awesome. I would be glad to review.
I ran into the same issue, but not locally, rather during the test suite running on travis. The kafka broker comes up, a metadata request is made, and the broker list comes out empty, which will pretty much put the test session in an unusable state.
I had a workaround by writing my own fixture that kept doing metadata requests on the one broker until it got back a populated list. I realise this is not an actual fix of the issue, but I'm a bit fuzzy on the actual desired behaviour, since getting an empty broker list sounds like it should at least generate a warning and it also seems like a server side bug (how can it actually reply to a request and say there's no brokers in the same request).
To make it more robust i was wondering if having the bootstrap code that deals with this logic implement some sort of retry mechanism until it gets a populated broker list. If you're OK with it I could have a PR ready @tvoinarovskyi
@gabriel-tincu Sorry for not getting back to you... If you have time, sure, would love to get help on those details. Yes, it seems very proper to do a retry mechanism, but maybe after scanning through all of the bootstrap hosts before retrying.
Hi,
We're facing an issue where we send and receive messages using aiokafka. When restarting kafka docker container the connection is established, but kafka seems not to be ready yet, returning empty metadata, causing sytem to hang. After further debugging I figured that adding a sleep of few ms before this line solves the problem. I am not sure this is the right solution. If so, I can open a pull request with the fix. Otherwise, I'd appreciate if you could come up with a more appropriate one @progovoy
Thanks,