dashbitco / broadway_kafka

A Broadway connector for Kafka
233 stars 53 forks source link

When a new node joins, two consumers never go to a balancing state #92

Closed slashmili closed 2 years ago

slashmili commented 2 years ago

Issue

I've faced a problem in production that when we add a new node(Node b) and the existing consumers are busy(in Node A), NodeB joining the consumer group, causes the NodeA to go to rebalanced mode but by the time it's ready to join, The Kafka coordinator times out and take NodeA out of the group. When NodeA tries to join again, it cause NodeB to go to rebalancing mode and it continues sometimes for 40 min until they settle!

This is the screenrecording showing the case https://github.com/slashmili/talks/blob/tmp-screen-recording/rec/kafka-nodes.mov

When watching the video pay attention to the app1 console, it says:

re-joining group, reason::rebalance_in_progress

and after a while the Kafka log says:

[2022-05-13 09:07:43,401] INFO [GroupCoordinator 1001]: Group foo_group_id2 remove dynamic members who haven't joined: Set('app1@Milads-MBP'/<0.474.0>-d94ec1de-a5b9-4e37-955d-d5d2c8a71e14) (kafka.coordinator.group.GroupCoordinator)

Why?

So you might ask why does it happen? What takes long for the NodeA to join again?

I've debugged brod and it all comes to this part :

https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L499-L502

brod wants to start rebalancing but it's waiting for line 502 which takes a long time to finish.

This calls BrodwayKafka which makes a call to the process and waits until all the ack messages are received.

https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L485-L488

How to fix this

Solution

The fix that seems working consistently is to add rebalance_timeout_seconds option. In the local env setting it to 200 did the trick. I suppose you won't mind if I create PR adding that?

Extra

Since I spent almost a week chasing these unexpected behaviour, I'd like to purpose to make it easier for the future users of this library. What I'd like to do is after the process receives :drain_after_revoke message, stores the time it received the message. In other part of the code if state.revoke_caller exists and it's been more than rebalance_timeout_seconds seconds, emit a warning to give a hint to the users about looking into rebalance_timeout_seconds option


Another case but related

So I started looking at this issue in a different environment. We have 100 partitions for a topic and we started seeing this round of rebalancing. I think adding rebalance_timeout_seconds solves part of the problem.

Another case is that imagine we have 2 nodes running and consuming 100 partition, each are polling from 50 partitions. When there is a new rebalancing(a new node comes online or broker has some maintenance). brod triggers assignments_revoked which triggers this call.

We saw that it also takes a long time and causes rebalancing. Looking at the process' mailbox it shows that there are ~50 scheduled poll message and then at the end of the mailbox there is the :drain_after_revoke message.

Which means all the poll messages are going to be execute and hand over events to the GenStage consumers without the knowledge of that the coordinator has asked to drain due to revoke of assignments. It adds additional waiting time because by the time :drain_after_revoke is processed we have to wait for ack of all other new messages we handed over to the consumers.

I thought of a solution to keep the state of revoked assignment in another process. But not really happy with the solution and would like to get your feedback on how to handle this case.

josevalim commented 2 years ago

Solution and extra: yes, PRs welcome!

Another case: you could create a public ETS table on the producer init. When drain after revoke is called, you store it in the ETS table. If the table has an entry during poll, we skip the poll altogether. PR also welcome!


Great findings and great report!

slashmili commented 2 years ago

All right, thanks for your feedbacks. I'll work on them.

I wish I could take all the credits. My colleague and I have been debugging rebalancing issue past two weeks and without him I couldn't have got this far!

victorolinasc commented 2 years ago

This is superb! Thanks @slashmili for probing into this. Just one question though: which Kafka version are you using? We've been struggling with this behaviour and had to tune the session timeout with different successes in our cluster.

We've been trying an update to a more recent Kafka (2.8+) that does not do eager rebalance. So, I wanted to check with you just which version of Kafka are you using.

slashmili commented 2 years ago

@victorolinasc our problem is not completely resolved but it has got much better. Still looking into the possible tweaks into brod and our settings...

We are using Kafka 2.6

slashmili commented 2 years ago

Right now I'm running 1eea41ebb559e5dab7749be7da98b24ac9c7dd09 but still have a problem. Not sure if it's something we can change in this library or related to how GenStage works.

This is my broadway settings:


defmodule MyApp.Broadway do
  use Broadway

  require Logger

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer_module(),
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 5
        ]
      ]
    )
  end

  defp producer_module do
    {BroadwayKafka.Producer,
     [
       hosts: "localhost:9092",
       group_id: "foo_group_id",
       topics: ["event.foo"]
     ]}
  end

  @impl true
  def prepare_messages(messages, context) do
    last_msg = List.last(messages)

    Logger.info(
      "Processing #{length(messages)} msgs for partition: #{last_msg.metadata.partition} with last offset: #{last_msg.metadata.offset}"
    )

    messages
  end

  @impl true
  def handle_message(_, message, _) do
    Process.sleep(200)
    message
  end
21:45:11.332 [info] Group member (foo_group_id,coor=#PID<0.522.0>,cb=#PID<0.517.0>,generation=42):
assignments received:
  event.foo:
    partition=0 begin_offset=688
    partition=1 begin_offset=110
    partition=2 begin_offset=110
    partition=3 begin_offset=111
    partition=4 begin_offset=102
    partition=5 begin_offset=5
    partition=6 begin_offset=1
    partition=7 begin_offset=3
    partition=8 begin_offset=2
    partition=9 begin_offset=1
    partition=10 begin_offset=2
    partition=11 begin_offset=2
    partition=12 begin_offset=1
    partition=13 begin_offset=2
    partition=14 begin_offset=2
    partition=15 begin_offset=6
    partition=16 begin_offset=3
    partition=17 begin_offset=3
    partition=18 begin_offset=4
    partition=19 begin_offset=7
21:45:15.327 [info] client MyApp.Broadway.Broadway.Producer_0.Client connected to localhost:9092

21:45:15.378 [info] Processing 5 msgs for partition: 0 with last offset: 692
21:45:15.434 [info] Processing 5 msgs for partition: 1 with last offset: 114
21:45:15.511 [info] Processing 5 msgs for partition: 2 with last offset: 114
21:45:15.592 [info] Processing 5 msgs for partition: 3 with last offset: 115
21:45:15.663 [info] Processing 5 msgs for partition: 4 with last offset: 106
21:45:16.384 [info] Processing 5 msgs for partition: 0 with last offset: 697
21:45:16.439 [info] Processing 5 msgs for partition: 1 with last offset: 119
21:45:16.516 [info] Processing 5 msgs for partition: 2 with last offset: 119
21:45:16.597 [info] Processing 5 msgs for partition: 3 with last offset: 120
21:45:16.668 [info] Processing 5 msgs for partition: 4 with last offset: 111
21:45:17.389 [info] Processing 5 msgs for partition: 0 with last offset: 702
21:45:18.394 [info] Processing 5 msgs for partition: 0 with last offset: 707
21:45:19.399 [info] Processing 5 msgs for partition: 0 with last offset: 712
21:45:20.404 [info] Processing 5 msgs for partition: 0 with last offset: 717
21:45:21.409 [info] Processing 5 msgs for partition: 0 with last offset: 722
21:45:22.414 [info] Processing 5 msgs for partition: 0 with last offset: 727
21:45:23.419 [info] Processing 5 msgs for partition: 0 with last offset: 732
21:45:24.424 [info] Processing 5 msgs for partition: 0 with last offset: 737
21:45:25.429 [info] Processing 5 msgs for partition: 0 with last offset: 742
21:45:26.314 [info] Group member (foo_group_id,coor=#PID<0.522.0>,cb=#PID<0.517.0>,generation=42):
re-joining group, reason::rebalance_in_progress
21:45:26.434 [info] Processing 5 msgs for partition: 0 with last offset: 747
21:45:31.435 [info] Processing 5 msgs for partition: 0 with last offset: 752
21:45:36.436 [info] Processing 5 msgs for partition: 0 with last offset: 757
21:45:41.437 [info] Processing 5 msgs for partition: 0 with last offset: 762
21:45:46.438 [info] Processing 5 msgs for partition: 0 with last offset: 767
21:45:51.439 [info] Processing 5 msgs for partition: 0 with last offset: 772
21:45:56.440 [info] Processing 5 msgs for partition: 0 with last offset: 777
21:46:01.441 [info] Processing 5 msgs for partition: 0 with last offset: 782
21:46:06.442 [info] Processing 5 msgs for partition: 0 with last offset: 787
21:46:16.449 [info] Group member (foo_group_id,coor=#PID<0.522.0>,cb=#PID<0.517.0>,generation=42):
failed to join group
reason: :unknown_member_id
21:46:16.450 [info] Group member (foo_group_id2,coor=#PID<0.522.0>,cb=#PID<0.517.0>,generation=42):

At 21:45:26.314 I ran another node which triggered rebalancing. I was expecting the Broadway's processors won't get any new job because we are in drain_after_revoke state. It means that the Broadway Producer not only has to wait for the previous acks but now new acks are added to it's state and has to wait for them to complete their tasks so the producer can call ack for those offset.

First I thought I missed some part to check for state.draining_after_revoke_flag flag and created #97. But after adding debugging and testing more I found out that the events are already returned back to GenStage at 531.

This is the part that gets a bit blurry for me since I don't know GenStage's internal. What I did was to look at the BroadwayKafka.Producer to see what's in the mailbox. After the rebalancing is started I saw this in the mailbox:

{:messages,
 [
   {:"$gen_producer",
    {#PID<0.553.0>, #Reference<0.1499299295.2310275082.236707>}, {:ask, 5}},
   {:ack, {57, "event.foo", 0}, [1223, 1224, 1225, 1226, 1227]},
   {:"$gen_producer",
    {#PID<0.553.0>, #Reference<0.1499299295.2310275082.236707>}, {:ask, 5}}
 ]}

Looks like that the consumers have finished their task on hands and they are asking for me, which triggered {:ask, 5} since BroadwayKafka is using GenStage.PartitionDispatcher as dispatcher, GenStage.PartitionDispatcher.ask/3 is being called but it doesn't have any knowledge about the assignment revoked and hands the consumer with more events from it's queue.

One possible fix is to use a custom dispatcher that knows about drain_after_revoke flag but I'd do it as the last resort! Is there any other suggestion?

slashmili commented 2 years ago

I think I've found the problem. I added a log to here https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/producer.ex#L670

+     Logger.debug("dequeue_many: acks: #{inspect(acks, charlists: :as_lists)}")
iex(2)> 00:59:16.956 [info] Processing 5 at #PID<0.548.0> msgs for partition: 1 with last offset: 415
00:59:16.959 [debug] dequeue_many: acks: %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435], 436, []}, {56, "event.foo", 2} => {[179, 180, 181, 182, 183], 184, []}, {56, "event.foo", 3} => {[184, 185, 186, 187, 188], 189, []}, {56, "event.foo", 4} => {[174, 175, 176, 177, 178], 179, []}, {56, "event.foo", 5} => {[200, 201, 202, 203, 204, 205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:16.968 [debug]  dequeue_many: acks: %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440], 441, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[184, 185, 186, 187, 188], 189, []}, {56, "event.foo", 4} => {[174, 175, 176, 177, 178], 179, []}, {56, "event.foo", 5} => {[200, 201, 202, 203, 204, 205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:16.984 [debug] dequeue_many: acks: %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445], 446, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[174, 175, 176, 177, 178], 179, []}, {56, "event.foo", 5} => {[200, 201, 202, 203, 204, 205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:16.997 [info] Processing 5 at #PID<0.551.0> msgs for partition: 5 with last offset: 204
00:59:16.998 [debug] dequeue_many: acks: %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450], 451, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[200, 201, 202, 203, 204, 205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:17.961 [info] Processing 5 at #PID<0.548.0> msgs for partition: 1 with last offset: 420
00:59:17.963 [debug] dequeue_many: acks:  %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455], 456, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[200, 201, 202, 203, 204, 205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:18.002 [info] Processing 5 at #PID<0.551.0> msgs for partition: 5 with last offset: 209
00:59:18.004 [debug]  dequeue_many: acks: %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460], 461, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:18.966 [info] Processing 5 at #PID<0.548.0> msgs for partition: 1 with last offset: 425
00:59:18.967 [debug]  dequeue_many: acks:  %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465], 466, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[205, 206, 207, 208, 209], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:19.008 [debug] dequeue_many: acks:  %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, ...], 471, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}
00:59:19.970 [info] Processing 5 at #PID<0.548.0> msgs for partition: 1 with last offset: 430
00:59:19.989 [debug]  dequeue_many: acks:  %{{56, "event.foo", 0} => {[], 2277, []}, {56, "event.foo", 1} => {[426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, ...], 476, []}, {56, "event.foo", 2} => {[], 184, []}, {56, "event.foo", 3} => {[], 189, []}, {56, "event.foo", 4} => {[], 179, []}, {56, "event.foo", 5} => {[], 210, []}, {56, "event.foo", 6} => {[], 116, []}, {56, "event.foo", 7} => {[], 108, []}, {56, "event.foo", 8} => {[], 84, []}, {56, "event.foo", 9} => {[], 119, []}, {56, "event.foo", 10} => {[], 114, []}, {56, "event.foo", 11} => {[], 108, []}, {56, "event.foo", 12} => {[], 119, []}, {56, "event.foo", 13} => {[], 102, []}, {56, "event.foo", 14} => {[], 112, []}, {56, "event.foo", 15} => {[], 118, []}, {56, "event.foo", 16} => {[], 78, []}, {56, "event.foo", 17} => {[], 104, []}, {56, "event.foo", 18} => {[], 45, []}, {56, "event.foo", 19} => {[], 34, []}}

I have 5 processors and I suppose because we are using GenStage.PartitionDispatcher, data from single partition only gets delivered to the same processor. But when we are popping data from queue, we don't have a way to say pop data from which partitions. As you can see the dequeue_many keep popping data related to partition 1.

By the time I stopped the app, It was processing up to offset 430 at #PID<0.548.0>

00:59:19.970 [info] Processing 5 at #PID<0.548.0> msgs for partition: 1 with last offset: 430

However based on the internal acks state, it already sent up to offset 476 to other GenStage internals and waiting for to get executed and send ack for it.

So it's not GenStage.PartitionDispatcher.ask/3 but BroadwayKafka.Producer not dequeuing data from correct partitions.

I wonder how could it know from which partitions should it pop the data 🤔

  1. If when we get handle_demand callback, we knew this demand is coming from which partition, we could use that to pop from the right partition. This requires change in GenStage and I don't think it's feasible unless you've seen such pattern before.
  2. We could potential keep queue(buffer) for each partition separately and pop from queues that the their acks are less than max_demand. Not sure if it makes sense.
josevalim commented 2 years ago

Great analysis! In GenStage, you indeed do not know who asked for data. However, I am not sure I understand the issue.

If you are rebalancing, it means no new data is arriving, correct? This means that we should empty each partition, one by one. So even though we are getting data only from one partition, it should eventually empty out and we will move on to the next partition. So the concurrency is reduced (which we could improve) but it should terminate.

What am I missing? :)

slashmili commented 2 years ago

Thanks! Learned a lot about Broadway and GenStage past few weeks. It's an amazing building block. I can appreciate even more 🙏🏼

If you are rebalancing, it means no new data is arriving, correct?

Yes but on each fetch, it fetches max_bytes and processing them takes time.

When the assignments_revoked is triggered we don't have to process all messages in our buffer. We just need to wait for the events that are handed over to the consumer and call ack on the last offset. Because we popped messages from the queue which in my case has lots of data for partition 1, it goes to the GenStage.PartitionDispatcher's internal queue and our producer waits for them to be acked.

If we only hand the correct amount of data per partition we can quickly finish the assignments_revoked and let brod to join the new consumer group generation(generation is a kafka terms).

josevalim commented 2 years ago

Sorry, I still don't get it. All of the message in the queue still has to be processed, right? Because there is no new data. Or do you mean that there is so much data for partition 1, that everything is lagging behind? In this case, I think your best course of action is to reduce max_bytes.

slashmili commented 2 years ago

All of the message in the queue still has to be processed, right?

This is a good strategy when we are shutting down node but not when assignments_revoked is called.

Why do you think all the messages hast to be processed? Rejoining the new consumer group generation in timely manner is an important job of the producer.

I don't mind if we drop the messages that are in the queue(and are not handed over to consumers) because Kafka is working with message offset and the unconsumed messages will be fetch again after the new assignments are received.

In this case, I think your best course of action is to reduce max_bytes.

We have been playing around with max_bytes but it never worked. Today I found the reason! I'll create a PR to fix the bug.

This is indeed a solution but it means every user with high throughput has to go through the same problem.

if we want to keep the code the same(and not introduce the queue per partition) it's better to set the max_bytes as lower value. What do you think?

josevalim commented 2 years ago

Why do you think all the messages hast to be processed? Rejoining the new consumer group generation in timely manner is an important job of the producer.

It doesn't. I am fine with having an option to drop messages from the queue when assignments are revoked. :+1:

We have been playing around with max_bytes but it never worked. Today I found the reason! I'll create a PR to fix the bug.

Perfect!

slashmili commented 2 years ago

We have been playing around with max_bytes but it never worked. Today I found the reason! I'll create a PR to fix the bug.

Sorry my bad I had a typo in the config name 🤦🏼

Just FYI using max_bytes is complicated, I had to measure average data payload and keep the messages loaded to the queue, lower than the ~max_demand~ 1 message so that the assignments_revoked finish quick.


I am fine with having an option to drop messages from the queue when assignments are revoked. 👍

How do you purpose to that? right now having single queue means we might pop the messages for the partitions that are already overloaded. these messages go to GenStage.PartitionDispatcher internal queue and wait for the related partition to process the data. By the time drain_after_revoke is called we can not ask GenStage.PartitionDispatcher reset it's internal queue. Or can we? I've seen that PartitionDispatcher has cancel but not sure if this is what I'm looking for and it's possible to call from the producer.

josevalim commented 2 years ago

Oh, I see, the data has already reached the partition dispatcher. There are ways to cancel it, by storing the subscription information and using GenStage.cancel/2, but that's not trivial either.

However, if it is already in the dispatcher, there is only a limited amount of data the dispatcher will buffer. So I go back to the scenario you are either loading too many data at once (max_bytes) or over provisioning your pipeline so it asks for too much data (max_demand * concurrency in processors is too high).

slashmili commented 2 years ago

All right thanks for your help and guidance 🙇🏼

In case someone in future have the same issue(more than one partition and consumer group has a huge lag), the only way that I can add the second node with successful rebalancing if I set the max_bytes as low as a single message. I'll update this comment if there is any update.

josevalim commented 2 years ago

Actually, changing max_bytes does not help, does it? because the issue is the data already in the partition dispatcher. Once you revoke assignments, the data that is in the producer (but not in the partition dispatcher) is cancelled, no?

In any case, the time depends on how long the messages take to complete. But if you have a single partition, the time to process all messages in processors will roughly be: time_per_message * max_concurrency * max_demand. So you can tweak all three parameters.

slashmili commented 2 years ago

Actually, changing max_bytes does not help, does it? because the issue is the data already in the partition dispatcher. Once you revoke assignments, the data that is in the producer (but not in the partition dispatcher) is cancelled, no?

It does if it's as low as 1 message, this config never overloads partition dispatcher with one busy partition data. At least repeated test I made shows that it works.

If max_bytes size is more than 1 message, what you mentioned is correct and changing max_bytes doesn't help because of queue in partition dispatcher.


Just to be clear I think the producer's queue should be per partition. it increases the concurrency when a partition is overloaded with data(to let other partitions to be consumed) and also allow the rebalancing to occur faster.

However I think I couldn't convince you either because I didn't explain well or I'm wrong.

I think I'll get back to you again 😉

anoskov commented 2 years ago

@slashmili we have the same issue in our production. We have a very busy node and when we add new instances, consumer group goes to forever rebalance. Couldn't you say please what decision did you end up with? Decrease max_bytes? Will it work if we have some events larger than max_bytes?

slashmili commented 2 years ago

@anoskov setting max_bytes as the size of single message did the trick in my local test setup.

But that's not optimal because after each a message is processed it fetch only one message from Kafka.

Will it work if we have some events larger than max_bytes?

In my local test it didn't work.

Couldn't you say please what decision did you end up with

polvalente commented 2 years ago

@anoskov setting max_bytes as the size of single message did the trick in my local test setup.

But that's not optimal because after each a message is processed it fetch only one message from Kafka.

Will it work if we have some events larger than max_bytes?

In my local test it didn't work.

This is worrisome for payloads which have variable size, though. I've been following this discussion and given this, I wouldn't consider setting max_bytes a solution for the problem

anoskov commented 2 years ago

@slashmili @polvalente thanks for reply! We have been using kafka for a long time and we can't replace it quickly and it fits our cases well. But we are stuck on the node scale issue. We used pure brod before and broadway now and both don't support scaling on our workloads. Please let me know if you find any solution

victorolinasc commented 2 years ago

@anoskov up to now in my use cases what really helped is a higher session timeout. As per many parties we've tried contacting said, we tried tweaking the session_timeout and heartbeat parameters to something like:

      group_config: [
        session_timeout_seconds: session_timeout,
        heartbeat_rate_seconds: max(div(session_timeout, 3), 3)
      ]

This helped us tremendously. Also, we've noticed that a higher processing time in the handle_* callbacks impacts a lot on higher rebalancing issues. It might make no sense, but when we removed the processing from the loop it got better a lot. I haven't had the time to dig in this so mush as @slashmili but these are the things we've done to minimize rebalancing.

Hope this helps people! Unfortunately time is not helping me out here...

anoskov commented 2 years ago

@victorolinasc thanks for answer! We set the session_timeout_seconds to 200 and heartbeat_rate_seconds to 5 but consumer still fails join. And yes some our events can be processed 30+ seconds. But unfortunately at the moment we can't speed up this or run in tasks due the order issue.

josevalim commented 2 years ago

Folks, if your events can take more than 1 second, you should really be setting max_demand to 1 and reducing max_bytes. Getting the data from Kafka won't be the bottleneck, so there is likely no reason to download a bunch of data upfront.

slashmili commented 2 years ago

@anoskov I was also going to reply on your comment. 30+ seconds is a lot. You'll face multiple problem a long the way. If you can't reduce 30 seconds consumption time, I suggest to look for different approaches, for example set process concurrency to and also max_demand to 1.

For us processing each message is taking ~10ms.

BTW there is a new rebalance_timeout_seconds in the main branch of this repo. maybe it helps you.

slashmili commented 2 years ago

I managed to make my local environment work which means rebalancing happened in a timely manner:

The topic has 20 partitions.

I ran 5 broadway :

    children = [
      MyApp.Broadway0,
      MyApp.Broadway1,
      MyApp.Broadway2,
      MyApp.Broadway3,
      MyApp.Broadway4
    ]

Each of them with a config like:

  Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: producer_module(),
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 1
        ]
      ]
    )

When the new node joined the rebalancing happened in 10 seconds:

20:35:34.882 [info] Processing 5 at #PID<0.555.0> msgs for partition: 3 with last offset: 673
20:35:34.955 [info] Group member (foo_group_id,coor=#PID<0.501.0>,cb=#PID<0.496.0>,generation=20):
re-joining group, reason::rebalance_in_progress
20:35:34.964 [info] Group member (foo_group_id,coor=#PID<0.525.0>,cb=#PID<0.522.0>,generation=20):
re-joining group, reason::rebalance_in_progress
20:35:34.964 [info] Group member (foo_group_id,coor=#PID<0.514.0>,cb=#PID<0.511.0>,generation=20):
re-joining group, reason::rebalance_in_progress
20:35:34.965 [info] Group member (foo_group_id,coor=#PID<0.539.0>,cb=#PID<0.533.0>,generation=20):
re-joining group, reason::rebalance_in_progress
20:35:34.965 [info] Group member (foo_group_id,coor=#PID<0.553.0>,cb=#PID<0.547.0>,generation=20):
re-joining group, reason::rebalance_in_progress
20:35:35.137 [info] Processing 5 at #PID<0.504.0> msgs for partition: 0 with last offset: 1014
20:35:35.137 [info] Processing 5 at #PID<0.527.0> msgs for partition: 2 with last offset: 718
20:35:35.137 [info] Processing 5 at #PID<0.516.0> msgs for partition: 4 with last offset: 537
20:35:35.137 [info] Processing 5 at #PID<0.541.0> msgs for partition: 1 with last offset: 759
20:35:35.137 [info] Processing 5 at #PID<0.555.0> msgs for partition: 3 with last offset: 678
20:35:45.148 [info] Group member (foo_group_id,coor=#PID<0.539.0>,cb=#PID<0.533.0>,generation=21):
elected=false
20:35:45.148 [info] Group member (foo_group_id,coor=#PID<0.525.0>,cb=#PID<0.522.0>,generation=21):
elected=false
20:35:45.148 [info] Group member (foo_group_id,coor=#PID<0.553.0>,cb=#PID<0.547.0>,generation=21):
elected=false
20:35:45.148 [info] Group member (foo_group_id,coor=#PID<0.514.0>,cb=#PID<0.511.0>,generation=21):
elected=false
20:35:45.148 [info] Group member (foo_group_id,coor=#PID<0.501.0>,cb=#PID<0.496.0>,generation=21):
elected=true
20:35:45.152 [info] Group member (foo_group_id,coor=#PID<0.539.0>,cb=#PID<0.533.0>,generation=21):
assignments received:
  event.foo:
    partition=1 begin_offset=760
    partition=11 begin_offset=208
20:35:45.152 [info] Group member (foo_group_id,coor=#PID<0.514.0>,cb=#PID<0.511.0>,generation=21):
assignments received:
  event.foo:
    partition=9 begin_offset=311
    partition=19 begin_offset=208
20:35:45.152 [info] Group member (foo_group_id,coor=#PID<0.501.0>,cb=#PID<0.496.0>,generation=21):
assignments received:
  event.foo:
    partition=0 begin_offset=1015
    partition=10 begin_offset=225
20:35:45.152 [info] Group member (foo_group_id,coor=#PID<0.525.0>,cb=#PID<0.522.0>,generation=21):
assignments received:
  event.foo:
    partition=5 begin_offset=319
    partition=15 begin_offset=213
20:35:45.153 [info] Group member (foo_group_id,coor=#PID<0.553.0>,cb=#PID<0.547.0>,generation=21):
assignments received:
  event.foo:
    partition=6 begin_offset=326
    partition=16 begin_offset=215

The number of Broadway application is vary depends on your topic settings.

This is kinda workaround. I'll sign off until I have a proper fix.

If you have a question or want to chat, you can find me in #broadway channel in Elixir's slack.

anoskov commented 2 years ago

@josevalim @slashmili thanks for help!

After all comments I am sure what we should do processing after kafka consume, maybe using job processing. We use broadway_kafka from main branch too and set rebalance_timeout_seconds and session_timeout_seconds to 200 but that's not enough. We set max_demand to 1 and producer for each topic-partition. In total 20 and 160 processors.

Regarding the workaroud. When I start using broadway_kafka on 0.3.3 version and trying start two producers with same group_id its gone to rebalance again and again. But I'll try it again on current branch with 1 producer concurrency, thank you!

josevalim commented 1 year ago

Please give main a try and let us know if it all works. If yes, we will ship a new release!