confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

assign() does NOT work inside on_assign-Callback if assignor is configured/seek() does NOT work inside on_assign-Callback #1669

Open Kogl1n opened 11 months ago

Kogl1n commented 11 months ago

Hello! My usecase is consuming a certain timeframe from a single topic with multiple consumers in a meaningful manner in case of fault (cooperative sticky assignor). But for now I am using just one consumer with one partition. So I use offsets_for_times to get offsets for the start timestamps and try to seek them.

@edenhill commented in #373 that one shouldn't use (poll and) seek after subscribe since subscribe is asyncronous. One should use the following instead:

    def on_assign(consumer, partitions):
        for p in partitions:
             # some starting offset, or use OFFSET_BEGINNING, et, al.
             # the default offset is STORED which means use committed offsets, and if
             # no committed offsets are available use auto.offset.reset config (default latest)
            p.offset = 1234
        # call assign() to start fetching the given partitions.
        consumer.assign(partitions)

     consumer.subscribe(mytopics, on_assign=on_assign)

This DOES NOT work if you configured an assignor: conf['partition.assignment.strategy']='cooperative-sticky', though: cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to set assignment: Local : Erroneous state"}

That seems reasonable since an assignor assigns the partitions but the following version with seek DOES NOT work (neither with or without an assignor):

    def on_assign(c, partitions):
        partition_to_timestamp_mapping = {topic_partition.partition: int(START_DT.timestamp()*1000) for topic_partition in partitions}
        topic_partitions_with_new_offsets = c.offsets_for_times([TopicPartition(TOPICS[0], partition, partition_to_timestamp_mapping[partition]) for partition in partition_to_timestamp_mapping.keys()])
        for topic_partition in partitions:
              c.seek(topic_partition)

(I read in an issue that calling assign is not necessary. The same code in the way it is not recommended works flawlessly.)

This doesn't work: cimpl.KafkaException: KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="Failed to seek to offset 1524: Local: Unknown partition"}

Even a minimum example passing in the provided partitions to seek fails:

for topic_partition in [TopicPartition(TOPICS[0], partition.partition, OFFSET_BEGINNING) for partition in partitions]:
    c.seek(topic_partition )

I am using up to date versions of the Python library and Kafka images:

Any help is very appreciated! Thank you!