FundingCircle / jackdaw

A Clojure library for the Apache Kafka distributed streaming platform.
https://fundingcircle.github.io/jackdaw/
BSD 3-Clause "New" or "Revised" License
369 stars 80 forks source link

Unexpected semantics of seek-to-timestamp #195

Closed jgracin closed 5 years ago

jgracin commented 5 years ago

I'm writing a utility which has to process messages within a time period and my implementation uses seek-to-timestamp. Documentation for seek-to-timestamp says the following:

After seeking, the first message read from each partition will be the EARLIEST message whose timestamp is greater than or equal to the timestamp sought.

This behavior is exactly what I need and what I'd expect. However, it is not what I actually get. Calling seek-to-timestamp with the current time as argument (e.g. Instant/now) positions consumer at the beginning instead of the end of partitions. Consequently, consumer reads all messages in a topic instead of none.

The following (existing) test case from client_test.clj demonstrates the problem:

   (testing "seek to ts-next=1000"
          (let [ts-next 1000]
            (as-> consumer $
              (client/assign-all $ (map :topic-name [topic-config]))
              (client/seek-to-timestamp $ ts-next [topic-config])
              (client/position-all $)
              (is (= [0]
                     (vals $))))))

Contrary to the documentation and expectation, the first message read would not be the one whose timestamp is greater than or equal to the timestamp sought, but the message with timestamp 1.

I suppose the implementation which would be more aligned with the documentation/expectation would handle nils returned by KafkaConsumer/offsetsForTimes and instead of returning position 0, return the last offset in partition. Unfortunately, trying to maintain the expectation in case of timestamps in the future is probably infeasible so that would not be covered.

I'd like to hear your thoughts on this.