Jroland / kafka-net

Native C# client for Kafka queue servers.
Apache License 2.0
483 stars 232 forks source link

Neither producer or consumer fail over to new partition leader when original goes away #61

Open Foo42 opened 9 years ago

Foo42 commented 9 years ago

If you are reading or writing from/to a partition leader when that node goes down, the tcp code appears to continuously try to reconnect to that node without attempting a metadata refresh to confirm whether that node is still the partition leader. This means that an application cannot take advantage of a successful election of a new partition leader without restarting (at which point the metadata is freshly retrieved).

Foo42 commented 9 years ago

Is this something you might consider a pull request on - don't want to dive in without checking incase it's functionality covered by some upcoming change, or something you've already got thoughts on.

I notice that there is already provision for a metadata refresh when there is an InvalidMetaDataException caught in the consumer https://github.com/Jroland/kafka-net/blob/56c4e2c8c186b20ed28fe037ec15b131614a3dae/src/kafka-net/Consumer.cs#L194 however that only appears to be thrown when there is a functioning tcp connection and the node replies that it is not the leader.

Jroland commented 9 years ago

I definitely take pull requests. So please by all means have a go. There is an open issue #10 which has some discussion on this exact topic. It is a major deficiency in the driver and I just have not been able to tackle it yet. So yes if you want to jump in, that would be fantastic.

As you mention I have slowly added the ability to respond to metadata exceptions, but it does not cover everything we need. Also you will see some event passing when a TCP connection is lost. The basic idea was to react to these types of events and do proper metadata re-queries. But definitely have a look and let me know what you think. And if you want me to take a look early on, just hit me with a message and I can take a look at your branch if you want.

apryiomka commented 9 years ago

James, i am not sure if you remember, but, I actually, raised that issue about a month and a half ago. Here is the original pull request for it: https://github.com/Jroland/kafka-net/pull/52. I proposed to let the request to time out if the node goes down...

Jroland commented 9 years ago

Thanks @apryiomka I did forget about that one. Sorry about that. If @Foo42 wants to take a look at #52 and help test it out and put it through its passes that would be great.

Foo42 commented 9 years ago

While considering how we could go about failing over ongoing send operations to a new partition leader I came to the conclusion that a lot of the complexity would arise in ensuring message ordering was preserved. However, in looking into this, I think we may already have an issue with ordering in the face of net glitches.

Consider the following stripped down scenario:

In this scenario messages were pushed in as A, B, C, but arrive as C, B, A. In our application this could have serious consequences.

While messages are still in the concurrent collection (queue) they maintain their order, if pushed into a functioning TCP connection they maintain their order, even if the transport has to resend packets as TCP maintains ordering. The problem comes when the TCP connection is out and we end up retrying in library code while continuing to pop items off the queue and thus lose their ordering. It seems to me that the only way to maintain order is to only pop items off the concurrent queue after successfully sending the previous. Essentially I think the comment found here: https://github.com/Jroland/kafka-net/blob/ed49ce4d892783d4a315392f30032a80cda696ad/src/kafka-net/Producer.cs#L199 represents an understandable, but probably unachievable goal.

This might seem like a big loss in potentially parallelisation but I suspect it is not as big as it seems - after all we are constrained by capacity of the TCP connection - pushing multiple messages in simultaneously will just make each go through slower. Where we will lose out, is that there will be additional latency waiting for the ACK to come back while we wait to send the next during which time the connection will not be being uses. However, message ordering seems to important to sacrifice for this. I think we just have to let people tune the batching parameters to get the best network saturation / latency trade off they can within these constraints.

I wasn't sure whether to open a new issue for this, or just comment here, since it seems an issue in its own right, but closely related to this problem too.

sqlBender commented 9 years ago

Might be worth reviewing how the other libraries handle this.

tc

On Jun 4, 2015, at 2:09 AM, Julian Haeger notifications@github.com wrote:

While considering how we could go about failing over ongoing send operations to a new partition leader I came to the option that a lot of the complexity would arise in ensuring message ordering was preserved. However, in looking into this, I think we may already have an issue with ordering in the face of net glitches.

Consider the following stripped down scenario:

Producer code pushes in a message (A) which is popped off the concurrent collection and handed to a kafka tcp connection to send. However, while trying to send it, the connection is dropped. The kafka tcp connection then dutifully tries to resend this batch with increasing back-offs. Some time later, the producer pushes on a second message (B) which is similarly popped off the queue and given to a kafka tcp connection to send. As with the first it encounters the connection issue and begins retrying with increasing back off. While both A and B are waiting to try again the network connection is restored. The producer pushes in a message (C) which is popped off the concurrent collection and handed to the kafka tcp connection to send which successfully sends it. The retry loop for B (which started later and is thus still using smaller back-offs) retries sending message B and succeeds. Eventually the retry loop for A tries again and successfully sends A. In this scenario messages were pushed in as A, B, C, but arrive as C, B, A. In our application this could have serious consequences.

While messages are still in the concurrent collection (queue) they maintain their order, if pushed into a functioning TCP connection they maintain their order, even if the transport has to resend packets as TCP maintains ordering. The problem comes when the TCP connection is out and we end up retrying in library code while continuing to pop items off the queue and thus lose their ordering. It seems to me that the only way to maintain order is to only pop items off the concurrent queue after successfully sending the previous. Essentially I think the comment found here: https://github.com/Jroland/kafka-net/blob/ed49ce4d892783d4a315392f30032a80cda696ad/src/kafka-net/Producer.cs#L199 represents an understandable, but probably unachievable goal.

This might seem like a big loss in potentially parallelisation but I suspect it is not as big as it seems - after all we are constrained by capacity of the TCP connection - pushing multiple messages in simultaneously will just make each go through slower. Where we will lose out, is that there will be additional latency waiting for the ACK to come back while we wait to send the next during which time the connection will not be being uses. However, message ordering seems to important to sacrifice for this. I think we just have to let people tune the batching parameters to get the best network saturation / latency trade off they can within these constraints.

I wasn't sure whether to open a new issue for this, or just comment here, since it seems an issue in its own right, but closely related to this problem too.

— Reply to this email directly or view it on GitHub.