nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Is possible to branch a stream? #67

Closed Moises-Soares closed 6 years ago

Moises-Soares commented 6 years ago

It's possible to branch a stream, like java counterpart?

KStream<String, Long> stream = ...; KStream<String, Long>[] branches = stream.branch( (key, value) -> key.startsWith("A"), / first predicate / (key, value) -> key.startsWith("B"), / second predicate / (key, value) -> true / third predicate / );

// KStream branches[0] contains all records whose keys start with "A" // KStream branches[1] contains all records whose keys start with "B" // KStream branches[2] contains all other records

// Java 7 example: cf. filter for how to create Predicate instances

krystianity commented 6 years ago

@Moisessoares I will add this in a smaller new release until the end of the week.

Moises-Soares commented 6 years ago

any news? do you need any kind of help?

krystianity commented 6 years ago

Sry @Moisessoares took a bit longer than expected, as I also had to update sinek and kafka-connect first. I also shipped typescript declarations as well as your branching in kafka-streams@4.6.0 A KStream instance now supports: branch(preds: (message: any) => boolean[]): KStream[];

I have added a unit test for the branching, solved by multicasting and adding a filter while mapping, however I have not yet been able to test it on a live broker, especially producing .to() a kafka topic from the branched streams. Let me know, if you need help.