confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
130 stars 897 forks source link

assignment() returns empty list #615

Open dbus-prsm opened 5 years ago

dbus-prsm commented 5 years ago

Description

After upgrading from 0.11.4 to 1.0.0 I noticed that assignment() has begun returning empty list. Am I missing something?

How to reproduce

Here is the simple script to reproduce:

from confluent_kafka import Consumer, version, libversion

conf = {'bootstrap.servers': 'XXX:9092',
        'group.id': 'test',
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        }

print('confluent-kafka-python: %s, librdkafka: %s' % version(), libversion())
topic = 'test'

c = Consumer(**conf)
c.subscribe([topic])
m = c.poll(1)
print(c.assignment())

c.close()

Output:

confluent-kafka-python: 0.11.4, librdkafka: 721920 ('0.11.4', 722175)
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}]

and

confluent-kafka-python: 1.0.0, librdkafka: 1048576 ('1.0.1', 16777727)
[]

Checklist

Please provide the following information:

ybbiubiubiu commented 5 years ago

Im my opinion,if you want to use api of the assignment,you should use api assign to distribute topic to consumer。 the reason is that assign() need to deliver the param:opject of TopicPartition.LIST。but subscribe() only need to deliver the str

edenhill commented 5 years ago

My guess is that the rebalance, and thus partition assignment, takes longer than the 1s poll you have. You can set 'debug': 'consumer' (or 'cgrp' for more details) config to see what is going on

dbus-prsm commented 5 years ago

@edenhill you're right, partition assignment takes about 3s on the latest release

%7|1561091912.128|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561091915.135|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)

vs 16ms on 0.11.4:

%7|1561092726.141|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "test": updating member id "(not-set)" -> ""
...
%7|1561092726.157|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "test" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)

The environment is exactly the same. The difference is only in confluent-kafka-python version.

edenhill commented 5 years ago

Can you reproduce this with debug=cgrp,protocol and provide the logs so we can figure out what is going on?

zealot-shin commented 5 years ago

Could I ask a question about subscribe() and assign()?

while using confluent_kafka to consume messages from kafka, if there would be any difference between subscribe() and assign()?

if I specify partitions to each consumer process with assign(), would kafka rebalance still work?

mhowlett commented 5 years ago

subscribe is what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically call assign for you as the set of partitions allocated to it changes. use assign explicitly (without subscribe) if you want to read from a static set of partitions.

zealot-shin commented 5 years ago

subscribe is what enables the rebalance capability. when you subscribe to one or more topics, the client will automatically call assign for you as the set of partitions allocated to it changes. use assign explicitly (without subscribe) if you want to read from a static set of partitions.

So in fact I can use subscribe and assign at the same time, and in this occasion, rebalance will still be enabled. And if I don't use subscribe, use assign only, rebalance will not be enabled, right?

yogendramaarisetty commented 3 weeks ago

I am facing similar issue using Confluent kafka in dotnetcore I need to check if the Assignments are greater than zero while consuming. But I am ending up in a continous loop without any assignments

Os: Windows 64 Kafka: Kafka 2.8.1 dotnet: 6.0 library: ConfluentKafka I have a event-log-master Topic created with 5 partitions on my kafka broker

Here's sample code in dotnetcore that I using for my testing to reproduce.

ConsumerConfig conf = new ConsumerConfig();
conf.BootstrapServers = "localhost:9092";
conf.EnableAutoCommit = false;
conf.SessionTimeoutMs = 30000;
conf.TopicMetadataRefreshIntervalMs = 10000;
conf.AutoOffsetReset = AutoOffsetReset.Latest;
conf.MaxPollIntervalMs = 300 * 60 * 1000;
conf.MaxPartitionFetchBytes = 10000;
conf.GroupId = "test-group-poc-1";
conf.AllowAutoCreateTopics = true;
conf.PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin;

var pattern = "event-log-master";
long count = 0;

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
    consumer.Subscribe(pattern);
    try
    {
        while (!cts.Token.IsCancellationRequested)
        {
            if (consumer.Assignment.Count > 0)
            {
                    var record = consumer.Consume(cts.Token);
                    Console.WriteLine($"Consumed {record?.Message}");
                                       if (record != null)
                    {
                        consumer.Commit(record);
                    }
            }
            else
            {
                ConsumeResult<string, string> msg = null;
                // var msg = consumer.Consume(0);
                consumer.Subscribe(pattern);
                Thread.Sleep(1000);
                Console.WriteLine($"ZeroAssignments: {consumer.Name} | msg: {msg?.Message}");
            }
        }
    }
    catch (OperationCanceledException)
    {
        // Ctrl-C was pressed.
    }
    finally
    {
        consumer.Close();
    }

I found a workaround which is somehow giving the assignments. I tried to call consumer.Consume(0) with 0 timeout when there are not assignments after the loop runs for 2 to 3 times we are getting the assignments. This behavior seems to be wierd.

What's the right fix. I was trying to re subscribe If I donet find any assignments, I know it doesnt make sense but I was just trying.