spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

How to wait for future inside Kafka Stream map()? #1199

Closed alkazap closed 2 years ago

alkazap commented 2 years ago

I am implementing Spring Boot application in Java, using Spring Cloud Stream with Kafka Streams binder.

I need to implement blocking operation inside of KStream map method like so:

public Consumer<KStream<?, ?>> sink() {
    return input -> input
        .mapValues(value -> methodReturningCompletableFuture(value).get())
        .foreach((key, value) -> otherMethod(key, value));
}

completableFuture.get() throws exceptions (InterruptedException, ExecutionException)

How to handle these exceptions so that the chained method doesn't get executed and the Kafka message is not acknowledged?

Is there a better way of blocking insidemap()?

sobychacko commented 2 years ago

@alkazap I think you asked the same question on SO as well. See my answer there. As I indicated there, it is hard to not let the message acknowledge in this case.

As a side note, if you ask on StackOverflow first, there is no need to duplicate the same question on GitHub (and vice-versa). You may ping us on an additional medium, only if you don't get a response within a reasonable period of time. Thanks!

alkazap commented 2 years ago

Thank you for the fast response, I will continue the discussion there. I apologize for the duplication of the question, I was not sure if SO questions reach you.