dashbitco / broadway_kafka

A Broadway connector for Kafka
222 stars 52 forks source link

Offsets accumulating in the producer ack state (take 2) #139

Open danmarcab opened 2 months ago

danmarcab commented 2 months ago

Good morning/afternoon/evening all 👋

We are experiencing an issue which shows itself as pretty much the same as https://github.com/dashbitco/broadway_kafka/issues/100 (cannot reopen that one)

We can see some messages stuck in the ack state:

iex> :sys.get_state(MyBroadway.Producer_13).state.module_state.acks
%{
  {3, "my_topic", 4} => {[276144535, 276144536, 276144537,
    276144538, 276144539, 276144540, 276144541, 276144542, 276144543, 276144544,
    276144545, 276144546, 276144547, 276144548, 276144549, 276144550, 276144551,
    276144552, 276144553, 276144554, 276144555, 276144556, 276144557, 276144558,
    276144559, 276144560, 276144561, 276144562, 276144563, 276144564, 276144565,
    276144566, 276144567, 276144568, 276144569, 276144570, 276144571, 276144572,
    276144573, 276144574, 276144575, 276144576, 276144577, 276144578, 276144579,
    276144580, 276144581, 276144582, ...], 276621000,
   [276144537, 276144538, 276144539, 276144540, 276144541, 276144542, 276144543,
    276144544, 276144545, 276144546, 276144547, 276144548, 276144549, 276144550,
    276144551, 276144552, 276144553, 276144554, 276144555, 276144556, 276144557,
    276144558, 276144559, 276144560, 276144561, 276144562, 276144563, 276144564,
    276144565, 276144566, 276144567, 276144568, 276144569, 276144570, 276144571,
    276144572, 276144573, 276144574, 276144575, 276144576, 276144577, 276144578,
    276144579, 276144580, 276144581, 276144582, ...]}
}

The processor itself keeps processing, it just never acks the messages.

That original issue was fixed by https://github.com/dashbitco/broadway/commit/602931c981d194c6e07e699ab45b6e3747cf44a0. Which was an obscure timer issue. I wonder if there is another edge case we are missing.

We are running the latest versions of broadway and broadway_kafka.

A couple of things that are different in our code/env:

Any ideas of where to look?

josevalim commented 2 months ago

If you recently changed the ack option, I would first investigate if it is related to that. :)

danmarcab commented 2 months ago

If you recently changed the ack option, I would first investigate if it is related to that. :)

Sounds like a sensible place to start, thanks

A bit more context, we have 2 pipelines with the exact same configuration, one of them is affected the other one isn't. Of course this could be a coincidence 🤷

slashmili commented 2 months ago

@danmarcab is it something that you can reproduce in a small application?

We are running version 0.4.1 in our prod without any problem, our setting is almost similar:

I'm asking because I'd like to see the issue in a small project and avoid updating until we can find the root cause.

danmarcab commented 2 months ago

@danmarcab is it something that you can reproduce in a small application?

We are running version 0.4.1 in our prod without any problem, our setting is almost similar:

  • offset_commit_on_ack: false
  • Elixir 1.15.7 (compiled with Erlang/OTP 26)
  • We are running more than one Broadway pipeline, what I mean is we have application setting like this:
    children = [
      MyApp.ConsumerBroadway,
      MyApp.ConsumerBroadwayV2,
      MyApp.ConsumerBroadwayV3,
      MyApp.Telemetry
      # Starts a worker by calling: MyApp.Worker.start_link(arg)
      # {MyApp.Worker, arg}
    ]

I'm asking because I'd like to see the issue in a small project and avoid updating until we can find the root cause.

No we haven't. This only happens occasionally after a few hours in our main app, so it's very hard to reproduce. I'll keep the issue updated when we find anything.

danmarcab commented 2 months ago

Another data point. We are not using batchers. Mentioning it because the fix from https://github.com/dashbitco/broadway/commit/602931c981d194c6e07e699ab45b6e3747cf44a0 was for the batcher stage.

josevalim commented 2 months ago

We don't cancel timers in Broadway Kafka, so I can't see it being an issue. We do use timers in the Broadway's rate limiter and it could have the same bug, but there it would raise (and not accumulate messages forever). So it may not be related to this.