dashbitco / broadway_kafka

A Broadway connector for Kafka
232 stars 53 forks source link

Offsets accumulating in the producer ack state #100

Closed danmarcab closed 1 year ago

danmarcab commented 2 years ago

Hi all! 👋

First of all, thanks for the great libraries!

We are running into a strange issue where every now and then (roughly once a day) we start seeing offsets accumulating in the ack state of a producer (for one or more partitions).

From our debugging we understand the ack state values are of shape:

{pending_list, last, seen_list}

Looking into the affected producers we can see the pending and seen lists keep growing indefinitely until the VM is OOM killed.

The producer and processors seem to keep fetching and processing messages as evidenced by the seen list growth. The issue seems to be a small number of messages were never acked and so they remain in the front of the pending list.

We've been digging in the source code of broadway/broadway kafka and cannot find any point where messages/acks can get lost without trace (because we are not seeing any error logs or crash reports).

As a very hacky workaround we considering periodically checking for offset lag and then manually sending an ack message for the missing offset_ids in the front of the pending to the producer if the lag is too big:

for producer <- Broadway.producer_names(MyBroadway) do
  acks = :sys.get_state(producer).state.module_state.acks
  Enum.each(acks, fn {k, {pending, _, seen}} -> 
    missing_ack = MapSet.difference(MapSet.new(pending), MapSet.new(seen))

    missing_from_front = Enum.take_while(pending, &MapSet.member?(missing_ack, &1))
    missing_count = Enum.count(missing_from_front)

    if Enum.count(seen) > 5000 do
      IO.inspect("#{missing_count} missing ack in the front for partition #{elem(k,2)}")
      send(producer, {:ack, k, missing_from_front})

This 'could' work for us since we mostly care about being up to date with the topic and we can assume missing a few messages, but it's far from ideal.

In case it's useful, our broadway pipeline is very simple, being roughly:

  def start_link(_opts) do
      name: __MODULE__,
      producer: [
             hosts: [{"hostname", 9092}],
             group_id: "group_1",
             topics: ["topic"]
        concurrency: 8
      processors: [
        default: [
          concurrency: 32
      batchers: [
        publish: [concurrency: 32, batch_size: 50],
        notify_errors: [concurrency: 10, batch_size: 50],
        ignore: [concurrency: 10, batch_size: 500]

  @impl true
  def handle_message(_, message, _) do
    if interested_in?(Map.new(message.metadata.headers)) do
      case parse_data(message.data) do
        {:ok, parsed_data} ->
          |> Broadway.Message.put_data(parsed_data)
          |> Broadway.Message.put_batcher(:publish)

        {:error, reason} ->
          |> Broadway.Message.put_data(reason)
          |> Broadway.Message.put_batcher(:notify_errors)
      Broadway.Message.put_batcher(message, :ignore)

  @impl true
  def handle_batch(:publish, messages, _batch_info, _context) do
    Enum.each(messages, fn data ->


  def handle_batch(:ignore, messages, _batch_info, _context) do

  def handle_batch(:notify_errors, messages, _batch_info, _context) do
    Enum.each(messages, fn message ->
      Logger.error("Error processing message", reason: message.data)


The error happens even when the interested_in? function returns false, and therefore no processing is done at all, just forwarding to the ignore batcher which does nothing.

Is there anything obvious we are missing?

josevalim commented 2 years ago

Nothing obvious come to mind, unfortunately.

danmarcab commented 2 years ago

Thank you for having a look @josevalim, really appreciated!

After debugging for another while we have found one of the possible causes (I say one because we've seen offsets accumulating without this error message) of this, the batcher dies due to an unknown timer.

"GenServer MyService.Broadway.Broadway.Batcher_ignore terminating
    ** (RuntimeError) unknown timer #Reference<0.1181658914.3223060481.95258>
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:207: Broadway.Topology.BatcherStage.cancel_batch_timeout/1
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:148: Broadway.Topology.BatcherStage.deliver_batch/6
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:118: Broadway.Topology.BatcherStage.handle_events_per_batch_key/3
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:64: anonymous fn/2 in Broadway.Topology.BatcherStage.handle_events/3
        (telemetry 1.1.0) /build/deps/telemetry/src/telemetry.erl:320: :telemetry.span/3
        (broadway 1.0.3) lib/broadway/topology/batcher_stage.ex:54: Broadway.Topology.BatcherStage.handle_events/3
        (gen_stage 1.1.2) lib/gen_stage.ex:2471: GenStage.consumer_dispatch/6
        (gen_stage 1.1.2) lib/gen_stage.ex:2660: GenStage.take_pc_events/3

It seems that Broadway already accounts for the case where the timeout message has been received when the cancel timer returns false (https://github.com/dashbitco/broadway/blob/main/lib/broadway/topology/batcher_stage.ex#L201-L213), but there seems to be an edge case?

My guess is crashing the batcher is fine for other Producers where the ACK is not sequential, but for BroadwayKafka it seems like it's messing up.

josevalim commented 2 years ago

I am looking at the code and I cannot see a code path that would make the error message above happen. Every time we cancel the timer, we delete the batch, which means it is impossible to recover the timer again.

josevalim commented 1 year ago

This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.

danmarcab commented 1 year ago

This has been fixed in Broadway. There was an assumption that the timer message would be delivered automatically but that was not always the case.

This is great news! Thank you very much for all your work @josevalim ❤️