spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
959 stars 677 forks source link

Example for Kafka Streams branching with Flux #198

Open benkeil opened 3 years ago

benkeil commented 3 years ago

Can you please add an example how to use branching in a reactive way?

Predicate<String, Flux<Review>> testPredicate = (k, v) -> ???

@Bean
@SuppressWarnings("unchecked")
public Function<KStream<String, Value>, KStream<String, Review>[]> map() {
    return reviews -> reviews.branch(testPredicate);
}
sobychacko commented 3 years ago

@benkeil Not sure what you mean here. Could you elaborate on the use case? For one thing, you cannot use reactive types with Kafka Streams functions. On the other hand, I don't see the need to use reactive types in the code above since you are operating on KStream and calling it's branch method. The branch method in KStream takes a predicate, but that is specific to that API. See the details here.

benkeil commented 3 years ago

The predicate needs to return a boolean and I don't know how to write it in a way that the flux returns a boolean.