dashbitco / broadway

Concurrent and multi-stage data ingestion and data processing with Elixir
https://elixir-broadway.org
Apache License 2.0
2.44k stars 161 forks source link

Expected Behavior on Startup? #311

Closed zbruhnke closed 2 years ago

zbruhnke commented 2 years ago

Hi Team,

I'm trying to understand something that we are seeing - we have a Broadway Consumer Group consuming events from a kafka stream.

The events are for the most part being consumed as expected, however yesterday we moved into production and we can see an event in the kafka stream that we never picked up but we also have NO logs, errors or otherwise of this message being processed.

Our staging instance is using the same prod.exs file with the only difference being the secrets it pulls from our hashicorp vault instance, so it feels like its likely not an application code issue but rather some sort of configuration issue.

There's some chance that we had not started the client when the event already happened though we're not not certain of that.

So the question we have right now is that if we have the settings in the options block for offset reset like this offset_reset_policy: :latest is the EXPECTED behavior that it would NOT process any of the events that it sees when the consumer starts?

If we use kcat and start a consumer on the server we can see the event that was already on kafka as expected so we do not believe it is a networking issue.

sample kcat command:

kcat \
    -X security.protocol=ssl \
    -C \
    -b kafka.kafka-v2.some-url.staging.saas.someurl.com \
    -t integration.some_api.operational_things.response

Does this make sense? If we started the app and the message was already in the topic should it NOT process it and only process future events?

We've currently yet to process another event for this because we do not want to turn it on for more customers in production until we can ensure we know why this happened

Thanks in advance for the help!

josevalim commented 2 years ago

latest will get the latest entry before the consumer group offset is committed. If you want to consume what was there before the consumer group started, you want earliest. :)

yordis commented 2 years ago

Hey @josevalim watching the following video https://youtu.be/ovdSOIXSyzI from Confluent peeps at 6:48; he mentioned the following:

6:47: If the consumer group instance is restarted,
6:48: the first thing it can do is to make a request
6:52: to the coordinator to retrieve its last committed offset.
6:55: And once it has the offset,
6:56: it can resume the consumption
6:59: from that particular offset.
7:00: If this consumer instance is started for the very first time
7:03: and there's no saved position for this consumer group,
7:07: then you can either start consuming
7:10: from the very beginning of this topic partition
7:13: or from the very latest offset.

Mainly the following (I am highlighting a few keywords with asterisks)

7:04 If this consumer instance is started for the **very first time**
7:03 and there's **no saved position for this consumer group**,
7:07 then you can either start consuming
7:10 from the very beginning of this topic partition
7:13 or from the very latest offset.

Reading your comment you mentioned the following:

If you want to consume what was there before the consumer group started

Did you explicitly mean to say started but exclude restarted?

I am trying to understand the expected behavior here because my understanding is that based on what the video is saying after a Consumer Group offset is committed, the value of offset_reset_policy doesn't matter anymore because an offset was committed. It would matter only the very first time the consumer group starts.

Right?

yordis commented 2 years ago

Kind of related to https://github.com/dashbitco/broadway_kafka/issues/27, it said that it was fixed.

We are observing the following.

  1. We started the consumer with :latest and processed and commit the offset
  2. We changed to be :earliest
  3. We re-consumed the messages

I would expect the offset_reset_policy to be ignored once some consumer group offset is committed.

josevalim commented 2 years ago

I would expect the offset_reset_policy to be ignored once some consumer group offset is committed.

Me too.