dashbitco / broadway_kafka

A Broadway connector for Kafka
222 stars 52 forks source link

Producers stuck in :assignments_revoked causing endless group rebalancing #118

Closed Goose97 closed 1 year ago

Goose97 commented 1 year ago

I'm noticing a strange behavior when my broadway pipeline with 3 producers will sometimes goes into a prolonged group rebalancing cycle. After digging the code, seems like the problem boils down to these lines of code

During group rebalance, brod_group_coordinator will trigger producer :assignments_revoked callback then wait for all messages to be acked, making brod_group_coordinator waits for producers

https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L502

Also with offset_commit_on_ack=true, ack will wait for offset commit, making producers wait for brod_group_coordinator.

https://github.com/dashbitco/broadway_kafka/blob/main/lib/broadway_kafka/brod_client.ex#L100

Basically, we have a situation where two GenServer are calling each other. commit_offsets has a timeout 5 seconds so each ack call from producer will take at least 5s to complete.

https://github.com/kafka4beam/brod/blob/master/src/brod_group_coordinator.erl#L295

This can easily exceed the rebalance timeout threshold of Kafka, which I suspect, will eventually leads to another group rebalancing.

telemetry print: {[:broadway_kafka, :assignments_revoked, :stop], %{duration: 42634748260},
 %{
   producer: #PID<0.5098.0>,
   telemetry_span_context: #Reference<0.3441398972.1746403335.182468>
 }}

Here's my deps version

[
    {:brod, "~> 3.16.0"},
    {:broadway_kafka, "~> 0.4.0"},
    {:msgpax, "~> 2.0"}
]

If you need a reproduction repo, please let me know

yordis commented 1 year ago

Having the same issue

zavndw commented 1 year ago

I have the same problem, is there a solution?

anoskov commented 1 year ago

Having problems with it too. It's already a second lock found. I described the previous one here https://github.com/dashbitco/broadway_kafka/issues/116 Perhaps it makes sense to reconsider the interaction between brod coordinator and broadway processes. But I don't have an idea now.

slashmili commented 1 year ago

Good catch @Goose97 🏆!

I should mention the problem also is another part of the system which BroadwayKafka.Producer overloads GenStagePartitionDispatcher with lots of jobs.

When the assignment_revoked is executed, BroadwayKafka.Producer can not ask GenStagePartitionDispatcher to ignore the jobs that are still in GenStagePartitionDispatcher's buffer.


So base on your finding and my understanding and experiment in local, when assignment_revoked is executed, while offset_commit_on_ack is true, BroadwayKafka tries to commit the offsets but all fail with {:error, :timeout}. It means all these messages which are proceeded during rebalancing will be re-process again anyway.

@josevalim any suggestion/guidance on how to fix this?


I'd say the users of this library whom has rebalancing issue must turn off offset_commit_on_ack for time being and make sure your consumers are idempotent. At least that's what we are discussing internally to do.

josevalim commented 1 year ago

If the assignments are revoked, does it mean we can no longer commit them anyway? Or is that a particularity of brod? Because one option is to release immediately when assignments are revoked and block on assignments assigned.

slashmili commented 1 year ago

If the assignments are revoked, does it mean we can no longer commit them anyway? Or is that a particularity of brod?

Neat approach! I can't tell for sure as I only have experience with brod. But looking at the commit offset function seems like it's possible to commit the offset in another generation.

However if we block on MemberModule:assignments_received, the same issue happens.

josevalim commented 1 year ago

Maybe we should add some sort of async_ack to brod if it doesn’t have one yet and use it in such occasions?

slashmili commented 1 year ago

Maybe we should add some sort of async_ack to brod if it doesn’t have one yet and use it in such occasions?

I'd say the idea of a sync ack is that the caller make sure the offset is committed. Otherwise the caller could start broadway with offset_commit_on_ack=false.

However async_ack is still a great improvement. At least caller can initiate the commit and not rely on a timer to commit offsets.

I'll describe the issue in brod. Let's see what do they suggest.

slashmili commented 1 year ago

Based on the https://github.com/kafka4beam/brod/issues/538, this is a known issue.

They purposed to use brod_group_coordinator:ack/5.

What if we call brod_group_coordinator:commit_offsets/1 as usual and only in case that assignment revoked is issued, use brod_group_coordinator:ack/5?


Update:

We are already calling ack/5

https://github.com/dashbitco/broadway_kafka/blob/50c710a0a29d3f5cb994933a3abf71b7598f9ea9/lib/broadway_kafka/brod_client.ex#L97-L105

So in case that we are in the rebalance state, we should skip sync commit offset all together.

josevalim commented 1 year ago

Sounds good to me!

josevalim commented 1 year ago

Please give main a try and let us know if it all works. If yes, we will ship a new release!

Goose97 commented 1 year ago

hi @slashmili , thanks for your work

I tried the fix from main but it didn't work. Are we suppose to check the revoke flag from the ets, not from the state? The flag was moved to ETS by this pull https://github.com/dashbitco/broadway_kafka/pull/93

I'll try a PR

anoskov commented 1 year ago

We are trying too. It doesn't work and still going to perma rebalance :(

slashmili commented 1 year ago

hmm, we haven't seen rebalancing for quite sometime. did you update the main to use #93 fixe as well?

anoskov commented 1 year ago

Yes, we are using 'main' branch now. Feels like rebalancing has become less frequent. But sometimes it happens and restart fix it. This happens only in the service where the load is high

slashmili commented 1 year ago

@anoskov thanks for your feedback. Make sense, in the high load systems it might be still a problem based on the your broadway config and how long it takes to execute each job.I couldn't find a solution but I think still overloading GenStagePartitionDispatcher's buffer is an issue.

@Goose97 I'd suggest to experiment with turning off offset_commit_on_ack. Read the docs and see if it fits with your requirement. Not sure if it helps as I don't know your use case but might do

anoskov commented 1 year ago

@slashmili thank you. We have session timeout 45 and rebalance timeout 200. We don't have events that are processed for more than 30 seconds. But if broadway still processes the entire buffer before join/heartbeat this could be a problem but in that case I don't understand why can't we make this after current event if need join. Same as my observation it can happen when total num of VM reductions greatly increases and overall performance drops.

josevalim commented 1 year ago

But if broadway still processes the entire buffer before join/heartbeat this could be a problem

because of parallel processing, you may process 10 before 7, 8, and 9. So we need to wait for everything.

josevalim commented 1 year ago

You can reduce the likelihood of this happening by reducing the amount of stages and max_demand. This will effectively reduce the amount of elements flowing through Broadway at a given time.

anoskov commented 1 year ago

@josevalim thanks for reply. We have already set max_demand to 1 with multiple producers and processors. I'm sure with our config events within one topic and partition are processed sequentially, so I still don't understand why can't the each producer handle only one partition and switch to join request after current event processing. And the buffer accumulates despite max_demand: 1, as I understand, because new events fetched without waiting for the end of processing

josevalim commented 1 year ago

Is that one producer and one processor in a single Broadway pipeline and then many of them?

anoskov commented 1 year ago

1 pipeline, 10 producers for multiple topics and partitions (10 partitions in total). So producer usually handle 1-2 partition, but sometimes each producer handle only one own partition. Processors count is 10, but I don't quite understand how are they concurrency work in context kafka where are the messages from producer queue processed sequentially. I see it in the logs and and also we had some problems when the event was processed for a long time and the following events stuck.

In my understanding if producers count = total topic-partitions count, each producer should handle only one own partition. When rebalance starts, every producer finishes processing your current event, joins new generation and when the rebalance ends it continues to process. is this possible? if not why?

josevalim commented 1 year ago

If I understood what you described correctly, then that's how it should be working. Especially with max_demand: 1.

anoskov commented 1 year ago

Hm... I'll inspect processes again. I understand correctly that with max_demand: 1 there should be only one message in the producer queue (buffer) ? And next fetch from kafka will be only after processing this message?