Closed apryiomka closed 2 years ago
Ok Alex, I can see what you are trying to do here. Basically the driver blocks when we lose the connection to Kafka, and theoretically waits indefinitely for a reconnect. I actually did this on purpose because I wanted the driver to handle reconnects automatically for the end user and essentially block upstream posts until a conneciton could be made.
However I think the problem comes in when the sendasync goes out then it blocks, but the response timeout scheduler still expires the response wait message, because we have added the message to the queue already.
I'd like to hear your opinion on this, but what I think should happen is...
SendAsync should still block waiting for a reconnect. We should not start the Response timeout until we know we have sent the message. New produce messages should block upstream until connection is established. The caller can subscribe to events to get notification of lost connections, and reconnect attempts. The lost connection events should be used by the driver to re-query metadata to get new broker allocations.
Would this cover your use cases?
Hey James, you got it right!
That is exactly the issue we are dealing with now. I like the implementation, i mean the fact that the driver is trying to reconnect automatically. The problem occurs, as you mentioned, when the request expires and the exception is being set which, basically, invalidates the request as the aggregation exception will be thrown. And I like this approach. In fact, I think, it should be thrown if it times out. The reason for it, say we have a service that pushes messages to Kafka. We expect every message to take no longer than 10 sec (the connection expiration timeout). In the current implementation, the service will block the app untill the connection is reestablished and since it is a silent process (the exception is being swallowed inside) we might not even know that something is wrong with Kafka cluster (broker).
I will give you a real life example that we are dealing with. We have a Kafka cluster with replication factor of 2 (two brokers with one topic and one partition), so we have one proker as a leader, the second one second acts as a replica. When we cache the leader in the BrokerRouter, that leader connection will always be used by default. Lets say the leader goes down and the second brokers becomes the leader now as Kafka will switch automatically. How do we know that the first one went down? Since all messages are now blocked, we cannot send any messages at all and pretty much block the entire service. This is the scenario with new user registration. Say, when we register a new user, we send a confirmation email to the user where user can activate their account. When user registers, we queue an email to kafka and the consumer will read that email and send it. We use kafka as centralized email solution where multiple apps queue emails and then background process grabs the emails, applies email template and sends via smtp. Now, if one of the brockers goes down, we are not able to queue any emails, so, technically, if new user registers, they should not be able to register until the queue is operational again, otherwise, the user will never be able to activate their account as they will never receive the email. If we do not fail, it will indefensibly blocks the thread which would result in web page timeout or something similar. Whereas we would rather report an error to the user (some generic error like something went wrong, please try again) and allow them to reattempt registration, on the other side we also have aggregation exception handler what will call BrokerRouter.RefreshTopicMetadata to refresh metadata and set a new leader as the second broker becomes the leader now. So we can either reattempt to send the same message using updated leader or we can log an error that will allow us to raise a critical issue with Kafka node. We can log a critical error that will alert the appropriate team that the messages are timing out.
There is also a similar issue with KafkaMetadataProvider.GetMetadataResponse. We loop through connections and throw ServerUnreachableException if no valid connection is found. But if the service is down, you will never throw that exception as var response = conn.SendAsync(request).Result; will block the thread indefinitely. Or if you happened to provide two connections and the first is down, we will never get a chance to get to the second one that can still be operational.
I think the way you did the scheduler to time out requests is awesome, and it handles its job very well. Lets let it handle instances when Kafka is down and allow the client, that uses the connection, to handle the time outs. We set response timeouts for a good reason, so we can take action if something goes wrong on the client side. And this is exactly the case why we want to know if Kafka is down or went down for whatever reason.
Thanks, Alex.
Date: Thu, 9 Apr 2015 22:23:20 -0700 From: notifications@github.com To: kafka-net@noreply.github.com CC: alex.sde@live.com Subject: Re: [kafka-net] This is to address the issues with requests not timing out when the kafk... (#52)
Ok Alex, I can see what you are trying to do here. Basically the driver blocks when we lose the connection to Kafka, and theoretically waits indefinitely for a reconnect. I actually did this on purpose because I wanted the driver to handle reconnects automatically for the end user and essentially block upstream posts until a conneciton could be made.
However I think the problem comes in when the sendasync goes out then it blocks, but the response timeout scheduler still expires the response wait message, because we have added the message to the queue already.
I'd like to hear your opinion on this, but what I think should happen is...
SendAsync should still block waiting for a reconnect.
We should not start the Response timeout until we know we have sent the message.
New produce messages should block upstream until connection is established.
The caller can subscribe to events to get notification of lost connections, and reconnect attempts.
The lost connection events should be used by the driver to re-query metadata to get new broker allocations.
Would this cover your use cases?
— Reply to this email directly or view it on GitHub.
Alex, just to give you an update. I have been working on this issue, and I have redesigned the KafkaTcpSocket to fix this bug. Your issue post here, really highlighted a corner case where the design really made it hard to fix the problem properly. Also, the design had too much thread contention and really didn't perform as well as I would like. Ill have an update to the code soon.
Hey, James, thanks for the update. I look forward to see the changes.
Alex.
Date: Mon, 20 Apr 2015 21:08:22 -0700 From: notifications@github.com To: kafka-net@noreply.github.com CC: alex.sde@live.com Subject: Re: [kafka-net] This is to address the issues with requests not timing out when the kafk... (#52)
Alex, just to give you an update. I have been working on this issue, and I have redesigned the KafkaTcpSocket to fix this bug. Your issue post here, really highlighted a corner case where the design really made it hard to fix the problem properly. Also, the design had too much thread contention and really didn't perform as well as I would like. Ill have an update to the code soon.
— Reply to this email directly or view it on GitHub.
Hi James and Alex, I'm investigating Kafka for use at my company and have also seen the issue where connections are blocked indefinitely when the leader goes down. I was curious if any progress has been made since this summer?
Hi,
I am currently investigating the use of Kafka and have also run into this issue.
I have come across the ConsumerOptions property TopicPartitionQueryTimeMs, which is implemented, but not used anywhere.
Has there been any progress on this issue, or are there any plans to address this in the near future?
Thanks, Hannah
Hi James I am in the same boat as Alex. I do need to get the confirmation when Kafka endpoint is down, so I can decide what to do with the messages. Currently when the endpoint is down, I dont get any exception. I am pulling the nuget package version of 0.9.0.65. Do you know if you have fix for this yet?
Thank you.
Trying to implement some fire and forget messaging. (I'm sure a common use-case) Can't afford to have my whole webserver come to a halt if Kafka is down.
Currently implementing a timeout manually and busting out of your code, but this feels wrong.
This is to address the issues with requests not timing out when the kafka end point is not available. Since we indefinitely try to reconnect to the endpoint, the await SendAsync is never returned and we do not get to observe the timeout exception on the asyncRequest.RecieveTask which could've timed already. In the real life scenario, the caller should be aware of the endpoint being down via timeout exception, not just await on it indefinitely.