confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
71 stars 863 forks source link

Consumer periodically invokes RevokedHandler and AssignedHandler for one consumer in group #1100

Open Ap4uPNZ opened 5 years ago

Ap4uPNZ commented 5 years ago

Description

I have consumer group that consists of 2 or 3 consumer. The random consumer has received assignment and revoking of same partitions each 20 minutes, but another consumers in group have not received any rebalancing. So it is probably not the rebalancing caused by broker. My application shares data between instances and detect changes in consumer group by the rebalancing process and trigger some business actions, so this issue causes too much redundant processing, when group is not changed.

Probably there is same issue on python Also I've seen question and have checked timeouts, but this rebalancing happens each 20 minutes and there is not default timeouts that are related with this time

Confluent.Kafka 1.1.0 and also 1.0.0 Apache.Kafka 2.1.1 reproduced both on Windows 10 and Linux (CentOs)

How to reproduce

Just create simple consumer

_jsonConsumer = new ConsumerBuilder<string, string>(consumerConfig)
    .SetLogHandler((consumer, error) => logger.LogError("Kafka log handler error"))
    .SetErrorHandler((consumer, error) => logger.LogError("Kafka error handler error"))
    .SetPartitionsAssignedHandler((consumer, list) =>
    {
        logger.LogWarning($"Partitions assigning {string.Join(",", list.GroupBy(x => x.Topic, (t, x) => $"Topic {t} : {x.Count()} partitions"))}");
    })
    .SetPartitionsRevokedHandler((consumer, list) =>
    {
        logger.LogWarning($"Partitions revoking {string.Join(",", list.GroupBy(x => x.Topic, (t, x) => $"Topic {t} : {x.Count()} partitions"))}");
    })
    .Build();

and consume in infinite loop

private void Processing(CancellationToken stoppingToken)
{
    try
    {
        // subscribe to several topics
        _jsonConsumer.Subscribe(_topics);
    }
    catch (Exception exception)
    {
        _logger.LogError(exception, "Error during subscribe");
        throw;
    }

    while (!stoppingToken.IsCancellationRequested)
    {
        ConsumeResult<string, string> message;
        try
        {
            message = _jsonConsumer.Consume(stoppingToken);
            if (message.IsPartitionEOF)
                continue;

            _logger.LogInformation($"Received message {message.Key} from {message.TopicPartitionOffset}");
        }
        catch (OperationCanceledException)
        {
            _logger.LogWarning($"Execution was cancelled");
            break;
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, "Error message consume");
            continue;
        }

        try
        {
             HandleMessage(message)
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, $"Error message handling (Message {message.Key} from {message.TopicPartitionOffset})");
        }

        try
        {
             _jsonConsumer.Commit(message);
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, $"Error message commit (Message {message.Key} from {message.TopicPartitionOffset})");
        }
    }
    _jsonConsumer.Close();
}        

the client configuration, other properties have default values

"ConsumerConfig": {
        "BootstrapServers": "localhost:9092"
        "GroupId": "my-consumer-group",
        "FetchWaitMaxMs": 50,
        "EnableAutoCommit": false,
        "StatisticsIntervalMs": 5000,
        "SessionTimeoutMs": 6000,
        "EnablePartitionEof": false,
        "AutoOffsetReset": "Earliest"
      },

Logs for one of instances which have rebalanicng for 2 hour from first assigning.

2019-10-28T12:56:13.873Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:56:12.515Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:36:14.048Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:36:12.548Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:21:13.809Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:21:12.509Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:01:13.978Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T12:01:12.505Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:41:13.648Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:41:12.500Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:21:13.826Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:21:12.614Z,"Partitions revoking Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:01:18.660Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic SecondTopic : 6 partitions"

Other instances have just true rebalancing logs

2019-10-28T11:01:18.602Z,"Partitions assigning Topic FirstTopic : 18 partitions,Topic ThirdTopic : 1 partitions,Topic SecondTopic : 6 partitions"
2019-10-28T11:01:18.561Z,"Partitions revoking Topic FirstTopic : 36 partitions,Topic ThirdTopic : 1 partitions,Topic SecondTopic : 12 partitions"
2019-10-28T11:01:15.488Z,"Partitions assigning Topic FirstTopic : 36 partitions,Topic ThirdTopic : 1 partitions,Topic SecondTopic : 12 partitions"

one oddity of my cluster: not all topics have a sufficient number of partitions

mhowlett commented 5 years ago

the logs provided don't overlap in time (except for the 01:18 rebalance), and here they are consistent with each other (don't demonstrate a problem) - the second consumer has all partitions revoked, then half assigned, and the first consumer gets the other half assigned. Perhaps you mean you are not getting any logs over this period from the second consumer? That is perplexing, I don't know how that would happen. If you provide complete logs for all consumers in the group that is experiencing the problem over the same time period when setting Debug to cgrp,fetch, we'll be able to comment further.

Also, setting SessionTimeoutMs to 6000 without reducing HeartbeatIntervalMs is living dangerously I think - not much room for error, i'm not surprised you're seeing frequent rebalances. I'd leave SessionTimeoutMs at the default unless you have very good reason to change it. If you reduce it to 6000, maybe reduce HeartbeatIntervalMs to 1500 or something like that.

Ap4uPNZ commented 5 years ago

@mhowlett, yes it's complete logs for all consumers, second does not participate in rebalancing procedure, only one problem consumer. So I think it's not related to heartbeat and session timeout. OK, I'll set Debug to cgrp,fetch and send you logs

Ap4uPNZ commented 5 years ago

There are logs for group of 3 consumers available by link. This "anomaly" has been reproduced for 2 consumers (hosts: b0fcec2 and 21b9c6) twice about in 2019-10-29T10:09:22 and 2019-10-29T10:29:22 (UTC Timezone), consumer 9e9a95ac has only two rebalaning, on starting and on stopping, and has no intermediate rebalancing.

Ap4uPNZ commented 4 years ago

Hi! Are there any news? Can I expect to get a resolution?

kbatalova commented 4 years ago

Hi all! I think I've faced with the similar problem. Do you have any updates here?

prokhorovn commented 4 years ago

Workaround described in linked python issue worked for me. I have several different topics subscriptions on single consumer instance in my app. Setting at least one topic name as regexp in Subscribe() (e.g. ^topic$) method does the trick. In answers to linked issue I saw that each 20m unsubscribe might be caused by librdkafka. For .net library is this the same? Is the bug caused by librdkafka or by .net wrapper itself? Are there any plans to do fixes?