spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
969 stars 595 forks source link

AbstractKafkaStreamsBinderProcessor ignores TimestampExtractor configuration starting with kafka-streams 3.7.0 #2922

Closed qxv1612 closed 3 months ago

qxv1612 commented 3 months ago

Hello,

about this code snippet starting at AbstractKafkaStreamsBinderProcessor:627 (version 4.1.0):

    if (timestampExtractor != null) {
        consumed.withTimestampExtractor(timestampExtractor);
    }

"consumed" is an Consumed-instance which is part of kafka-streams.

In kafka-streams 3.6.1 a call to consumed.withTimestampExtractor() changes the existing instance. Since kafka-streams 3.7.0 the same call creates a new instance of "Consumed", but the code above ignores this new instance. In the end, the timestampExtractor is not used at all since kafka 3.7.0.

Same for "consumed.withName" a couple of lines below.

IMHO this is a big change in kafka-streams inside a minor version change.

To fix this anyway the code above might be changed to

    if (timestampExtractor != null) {
        consumed = consumed.withTimestampExtractor(timestampExtractor);
    }

Regards,

Ralf

sobychacko commented 3 months ago

@qxv1612 Your fix makes sense. If you are willing, please feel free to send a PR with the fix. Otherwise, we will look into it later. Thanks!

qxv1612 commented 3 months ago

Hello @sobychacko, I forked the repo and created the PR based on the main branch. I'm not sure if this is the right method.

A local build currently fails in KafkaBinderTests, but I expect its related to my local env settings. Actually I started the same test with the current main branch.