confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.73k stars 882 forks source link

Consumer.offsets_for_times() returns the same offset for different timestamps #1178

Open miroslavbel opened 2 years ago

miroslavbel commented 2 years ago

Description

How to reproduce

Set up some variables to use

>>> from datetime import datetime
>>> from confluent_kafka import TopicPartition
>>>
>>> topic = 'topic_name' # set up to real topic name
>>>
>>> partition = 0 # set up to existing topic partition
>>>
>>> date_in  = datetime(2021, 8, 6, 11, 10, 00)
>>> date_mid = datetime(2021, 8, 6, 11, 15, 00)
>>> date_out = datetime(2021, 8, 6, 11, 20, 00)
>>>
>>> tp_in = TopicPartition(topic=topic, partition=partition, offset=int(date_in.timestamp() * 1_000))
>>> tp_mid = TopicPartition(topic=topic, partition=partition, offset=int(date_mid.timestamp() * 1_000))
>>> tp_out = TopicPartition(topic=topic, partition=partition, offset=int(date_out.timestamp() * 1_000))

Connect to kafka

>>> from confluent_kafka import Consumer
>>> 
>>> cfg = {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}
>>> 
>>> consumer = Consumer(cfg)
>>> 
>>> consumer.list_topics(topic, 5) # for get real partitions
>>> consumer.subscribe([topic])

case 1 (get offsets for three timestamps separately)

  1. call .offsets_for_times() three times with a list with one timestamp
  2. get:
    • 46504966 offset for date_in
    • 46504987 offset for date_mid
    • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in])
[TopicPartition{topic=topic_name,partition=0,offset=46504966,error=None}]
>>> consumer.offsets_for_times([tp_mid])
[TopicPartition{topic=topic_name,partition=0,offset=46504987,error=None}]
>>> consumer.offsets_for_times([tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

case 2 (get offsets for two timestamps at one call)

  1. call .offsets_for_times() with a list with two timestamps
  2. get:
    • 46504988 offset for date_in
    • 46504988 offset for date_out
  3. expect:
    • 46504966 offset for date_in
    • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

case 3 (get offsets for three timestamps at one call)

  1. call .offsets_for_times() with a list with two timestamps
  2. get:
    • 46504988 offset for date_in
    • 46504988 offset for date_out
    • 46504988 offset for date_out
  3. expect:
    • 46504966 offset for date_in
    • 46504987 offset for date_mid
    • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in, tp_mid, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

Checklist

Please provide the following information:

confluent_kafka.version(): ('1.7.0', 17235968) confluent_kafka.libversion(): ('1.7.0', 17236223) python:Python 3.7.11 [GCC 9.3.0] on linux OS: Ubuntu 20.04.2 LTS Client configuration: {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}

edenhill commented 2 years ago

I have a vague memory that you're not allowed to pass the same partition multiple times in the same call.

miroslavbel commented 2 years ago

Oh, yes. I see this note in librdkafka but I don't see in confluent-kafka-python. May be we need to fix this?

miroslavbel commented 2 years ago

It seems to me that docs at site is different from this repo: site in Consumer.poll() doesn't have note section, but this repo has.

pranavrth commented 4 months ago

New Admin API, list_offsets is also available now. You can use that API now.

Keeping this issue Open as there is some documentation related issue between API documentation and the comments in the code.