BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
260 stars 39 forks source link

Kafka Partition Revoked #214

Closed leorg99 closed 12 months ago

leorg99 commented 1 year ago

Hi Sergio,

Recently started using your library and love it! However, I am running into an issue. In the logs I see that the topic I am listening will get an event that the partition was revoked but then it is assigned again short time later. At some point, the partition is revoked and never assigned back.

Any idea what to check? When I check the topic to see what consumers are connected, I see none and the group is behind on messages.

Here is what my config looks like:

        builder.AddKafkaEndpoints(endpoints =>
        {
            endpoints
                .Configure(config =>
                {
                    config.BootstrapServers = connectionSettings.BootstrapServers;
                    config.AllowAutoCreateTopics = false;
                    config.ClientId = "prospect_index_daemon_v2";
                })
                .AddInbound<IndexProspect.Command>(
                    endpoint =>
                        endpoint.ConsumeFrom(KafkaConfig.Topics.ProspectV2IndexRequests.GetTopic(settings.EnvironmentName))
                                .Configure(config =>
                                {
                                    config.GroupId = "prospect_index_daemon_v2";
                                    config.AutoOffsetReset = AutoOffsetReset.Earliest;
                                })
                                .ProcessAllPartitionsTogether()
                                .OnError(policy => policy.Skip())
                                .SkipNullMessages()
                )

Thanks!

BEagle1984 commented 1 year ago

The rebalance is coordinated by the broker and it guarantees that all the partitions are reassigned. Do you see the revoke but not the assign in the logs and no exception? Is this the only instance of your service or are there multiple consumers in the consumer group? Do you have a custom IKafkaPartitionsRevokedCallback or IKafkaPartitionsAssignedCallback in place?

leorg99 commented 1 year ago

Do you see the revoke but not the assign in the logs and no exception?

I see the revoke and assign most of the time but then at some point there is only a revoke with no assign.

Is this the only instance of your service or are there multiple consumers in the consumer group?

This should be the only instance in the group but it would be good if there is a way for me to monitor the group to see if anything is connecting that shouldn't. Trying to see how to do that.

Do you have a custom IKafkaPartitionsRevokedCallback or IKafkaPartitionsAssignedCallback in place?

No.

leorg99 commented 1 year ago

I created a sample project with the issue I am experiencing. When you run this the first time, it will create the topic kafka-partion-test with 1 partition and then connect to it. What I then did was run a second instance of this sample project to see what happens.

What I found is that partition 0 is revoked for the first instance and assigned to the second one (as expected). When I terminate the second instance, I expect the first instance to get reassigned partition 0, but this does not occur. It actually seems like the first instance gets stuck with the last log message showing:

trce: Silverback.Messaging.Broker.KafkaConsumer[1999]
      Waiting until ChannelsManager stops... | consumerId: 64de2533-f808-4ebf-b110-6c6bac5f3d8f, endpointName: kafka-partion-test

Even Ctrl+C at this state causes a long delay because it seems like it is waiting on this to terminate. I am not sure if I found a bug or I am just doing something wrong.

BEagle1984 commented 1 year ago

Thank you for the repro. I just had a quick look and I suspect it happens because there's only 1 partition and the revoke procedure gets stuck when there is nothing to revoke. I will fix this. 👍

BEagle1984 commented 1 year ago

The bug has been fixed. The fix will be released with the upcoming v4.4.0.