spreedly / kaffe

An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.
https://hex.pm/packages/kaffe
MIT License
149 stars 58 forks source link

Kaffe doesn't resume at last offset on consumer restart(?) #11

Closed rwdaigle closed 7 years ago

rwdaigle commented 7 years ago

I witnessed the following behavior on Heroku, which seems to imply that the last successfully acknowledged offset might be trailing from reality. Observe the following offset output before and after a re-deploy:

2017-02-03T18:25:15.799201+00:00 heroku[consumer.1]: count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236722 partition=0 key=VSURL1bFrPV45mDXndn7nQLUGcY
2017-02-03T18:25:15.799273+00:00 heroku[consumer.1]: Stopping all processes with SIGTERM
2017-02-03T18:25:15.847917+00:00 heroku[consumer.1]: Process exited with status 143
2017-02-03T18:25:15.463461+00:00 heroku[consumer.1]: Restarting
2017-02-03T18:25:15.464027+00:00 heroku[consumer.1]: State changed from up to starting
2017-02-03T18:25:17.731609+00:00 heroku[slug-compiler]: Slug compilation started
2017-02-03T18:25:17.731619+00:00 heroku[slug-compiler]: Slug compilation finished
2017-02-03T18:25:17.985935+00:00 heroku[consumer.1]: Restarting
2017-02-03T18:25:21.324378+00:00 heroku[consumer.1]: Starting process with command `mix run --no-halt`
2017-02-03T18:25:21.996317+00:00 heroku[consumer.1]: State changed from starting to up
...
2017-02-03T18:25:26.087713+00:00 app[consumer.1]: 18:25:26.087 [info]  count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236610 partition=0 key=JIvYocCuHfVHPZpxJab0SE8NNex
2017-02-03T18:25:26.088097+00:00 app[consumer.1]: 18:25:26.087 [info]  count#kafkacat-consumer.messages.received.count=1 topic=whitelist source=whitelist offset=2236611 partition=0 key=VC6mAFv1qouc3cGjtMDe7ocd0zK

The offset goes from 2236722 before the restart back to 2236611 afterwards - essentially regressing.

Now - the log stream is not guaranteed to be sequential so it's (quite) possible it gets interleaved across dynos. I just wanted to put this on the radar for further confirmation/investigation. E.g. just because the sequence seems off in the output I captured, doesn't mean 2236722 was processed more than once.

@spreedly/systems-dev

sdball commented 7 years ago

@rwdaigle That is definitely the case. Offsets are committed back to Kafka periodically, it's not tracking them realtime. Whenever a group consumer using Kafka to track its offsets restarts there will always be a guarantee of at least some messages being consumed again.

Kafka clients allow the offset commit interval to be configured: defaulting to the (iirc) Kafka recommended value of 5 seconds. That strikes a balance between limiting the amount of repeat message consumption vs overwhelming Kafka with a lot of offset updates.

sdball commented 7 years ago

From the Kafka O'Reilly guide:

Automatic Commit

The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit = true then every 5 seconds the consumer will commit the largest offset your client received from poll(). The 5 seconds interval is the default and is controlled by setting auto.commit.interval.ms. As everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if its time to commit, and if it is, it will commit the offsets it returned in the last poll.

Before using this convenient option, however, it is important to understand the consequences.

Consider that by defaults automatic commit occurs every 5 seconds. Suppose that we are 3 seconds after the most recent commit and a rebalance is triggered. After the rebalancing all consumers will start consuming from the last offset committed. In this case the offset is 3 seconds old, so all the events that arrived in those 3 seconds will be processed twice. It is possible to configure the commit interval to commit more frequently and reduce the window in which records will be duplicated, but it is impossible to completely eliminate them.

Note that with auto-commit enabled, a call to poll will always commit the last offset returned by the previous poll. It doesn’t know which events were actually processed, so it is critical to always process all the events returned by poll before calling poll again (or before calling close(), it will also automatically commit offsets). This is usually not an issue, but pay attention when you handle exceptions or otherwise exit the poll loop prematurely.

Automatic commits are convenient, but they don’t give developers enough control to avoid duplicate messages.

sdball commented 7 years ago

In our case, Brod does an excellent job of following the spec and controlling the poll loop. Kaffe uses acknowledged synchronous consumption which is why the handle message function requires an :ok response. Kaffe blocks waiting for that required response and then acknowledges the offset as successfully processed to Brod so that it can be committed back to Kafka.

rwdaigle commented 7 years ago

Great info 👍, thanks!

Yet another reason why Kafka consumers should be implemented to be idempotent - can only guarantee at least once delivery!