SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

Clearing queues on longpolling (and non-longpolling) sockets #969

Open thomaslee opened 6 years ago

thomaslee commented 6 years ago

While doing some more testing on the SASL PLAIN stuff I noticed two things that seem like probable bugs:

  1. We skip clearing queues for longpolling sockets. This means in-flight fetch requests may never be cancelled & timeouts never get cancelled. This can cause Mocha tests to linger if you're trying to be a good citizen & avoid --exit.
  2. For non-longpolling sockets, we won't call callbacks unless an error has been explicitly set on the socket. So you can end up in a situation where user callbacks never get invoked which can lead to odd/surprising behavior.

For me number 1 above seems to regularly cause a 30s delay in my tests exiting due to lingering fetch timeouts. Number 2 is theoretical, but it may explain some other odd behavior I'm seeing. Still investigating.

thomaslee commented 6 years ago

~On second thought, number 2 may not be a problem unless we can somehow queue up calls that don't have a timeoutId set on the callback function. I don't think that's typically something we do -- so that's probably a non-issue.~

Number 1 is definitely something I'm seeing in 2.6.1 though.

EDIT: Number 2 is totally a problem too. Imagine a user callback has been wrapped before being placed into the callback queue. It should really, really be called, even if an explicit error hasn't occurred -- probably just with an error indicating that the socket has been closed. Then we should also clear the timeout. I think. It's late. :)

hyperlink commented 6 years ago

Hi, @thomaslee catching up on kafka-node notifications.

I agree we should not skip cleaning up long polling sockets, not sure why we skip those. Keep in mind fetch request traditionally did not take callbacks instead results are emitted as well as any errors. I believe it was designed this way to simplify the fact that a single fetch call ends up being distributed to many brokers sockets. Recently a callback param was introduced to KafkaClient's fetch to be consistent with the other requests but I am not sure how useful it is since the API still works in the way described.

I don't think 2 it's a problem since when the request does timeout the callback is called with a timeout error and is then removed from the callback queue. And the timeout is wrapped with the callback and added to the queue synchronously so I don't see any race condition issues there.

Once skip is removed all callbacks should be called unless a user explicitly closes the client.

thomaslee commented 6 years ago

catching up on kafka-node notifications.

Appreciate it!

I agree we should not skip cleaning up long polling sockets, not sure why we skip those. Recently a callback param was introduced to KafkaClient's fetch to be consistent with the other requests but I am not sure how useful it is since the API still works in the way described.

Don't think I have enough history to understand the details here yet, but I'll try to wrap my brain around it later tonight. Thanks for the context, though.

I don't think 2 it's a problem since when the request does timeout the callback is called with a timeout error

Not quite what I'm talking about: the problem is we clear the timeout when the socket is closed without error -- so the timeout never gets called. We should at the very least invoke those callbacks with something like new Error('socket closed') in addition to clearing the timeout. Make sense?

Looks like client.js is doing the right thing:

https://github.com/SOHU-Co/kafka-node/blob/b2e3593373dd6b2b1bfcb448982c461b4a414fe1/lib/client.js#L723-L728

but kafkaClient.js is subtly different (and I think this is incorrect):

https://github.com/SOHU-Co/kafka-node/blob/b2e3593373dd6b2b1bfcb448982c461b4a414fe1/lib/kafkaClient.js#L622-L626

Off-hand, I think if we simply always pass in an error & remove the explicit clearTimeout branch, the problem is basically solved.

hyperlink commented 6 years ago

I think I understand what you mean now. Yes, the queue and timeouts are cleared without calling the callback for the case when the user explicitly closes the client (the self.closing check). The idea is if a user is explicitly closing the client they do not care or want to get any events about errors. A users code could also remove any error handlers during this cleanup process so any errors that are emitted will likely cause an uncaught exception in their application. The intermittent failures in our test motivated this change since the request timeout errors were happening to previous tests that already had their client's closed.

thomaslee commented 6 years ago

Hm can understand some of the motivation -- certainly, bubbling up events seems like a bad idea, but it gets really messy upstream if callbacks don't get called -- because it means that any kafka-node call that takes a callback actually needs to consider three cases instead of the usual two: the error case, the happy path, and the "this callback was never called" path.

For example: if you have something like this:

console.log('sending!')
producer.send(..., err => {
  if (err) {
    console.log('send failed!', err)
    return
  }
  console.log("sent!")
})

// imagine this spontaneously happens on some
// other unrelated code path due to a SIGTERM or
// whatever
setImmediate(() => producer.close())

As a user, you will see "sending!", but you may not see either "sent!" or "send failed!" and -- without knowing intimate details of kafka-node -- you won't know why.

The intermittent failures in our test motivated this change since the request timeout errors were happening to previous tests that already had their client's closed.

If your point here is that this is a hard thing to change right now I don't doubt it -- I'm just sort of politely disagreeing that the current semantics (as I understand them!) make sense.

hyperlink commented 6 years ago

ok, I see your point about callback expectations and I'm not opposed to changing the current behavior. It's a good opportunity now since the next release is a major version change.

thomaslee commented 6 years ago

Great -- holler out if you'd like one of my ham-fisted PRs, but you may be in a better position to address this given your understanding of the issues you ran into last time around. Let me know either way.

Just out of curiosity, what else are you trying to wrap up into the major release?

hyperlink commented 6 years ago

There are some warts in the API that I'd like to change. And of course, just treading water trying to get through a backlog of PRs. Maintaining this module isn't my day job so I can only get time here or there. Appreciate the help so far. 👍

aikar commented 6 years ago

I've been working on some close work that may resolve #2 concerns?

https://github.com/aikar/kafka-node/commit/7d1a02b6471732afe13944e5a3f1b8720bb3daab

I'm hoping to get back to trying to get this as a valid PR soon.

badeball commented 5 years ago

We skip clearing queues for longpolling sockets. This means in-flight fetch requests may never be cancelled & timeouts never get cancelled. This can cause Mocha tests to linger if you're trying to be a good citizen & avoid --exit.

Can confirm and it is highly annoying if you do want to test anything involving the library. An addition to this annoyance is how commit() and autoCommit() behaves identically for a HighLevelConsumer. This also adds a timeout of 5 seconds by default to any test.

badeball commented 5 years ago

There also is a method called clearPendingFetches() on ConsumerGroup, which doesn't seem to do what its name suggest.

badeball commented 5 years ago

An addition to this annoyance is how commit() and autoCommit() behaves identically for a HighLevelConsumer. This also adds a timeout of 5 seconds by default to any test.

This specifically has been solved with #797 and release of v4.0.0.