nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Question: How do I consume from multiple topics? #64

Closed paambaati closed 5 years ago

paambaati commented 6 years ago

The changelog for v4 lists the ability to provide the getKStream() method with multiple topics, but this doesn't seem to work —

import { KafkaStreams } from 'kafka-streams';

const kafkaStreams = new KafkaStreams({...});

const inputTopics = ['t1', 't2'];
const inputStream = kafkaStreams.getKStream(inputTopics);

inputStream.forEach(message => console.log);

inputStream.start();

I've ensured both topics (t1 and t2) have messages to read from, but the output always contains messages from only one of them.

paambaati commented 6 years ago

For anyone that ended up here looking for a similar solution, here's how I ended up doing it —

import { KafkaStreams } from 'kafka-streams';

const kafkaStreams = new KafkaStreams({...});

const inputTopics = ['t1', 't2'];
const inputStreams = inputTopics.map(inputTopic => {
    return kafkaStreams.getKStream(inputTopic);
});
const mergedStream = inputStreams.reduce((allStreams, stream) => {
    return allStreams.merge(stream);
});

mergedStream.forEach(message => console.log);

Promise.all(inputStreams.map(inputStream => {
    return inputStream.start();
}));

@krystianity Is this the recommended method of consuming from multiple topics?

krystianity commented 6 years ago

Hi @paambaati the recommended way is to use myKStream.from(["one", "two"]) :) alternatively myKStream.from("one").from("two") should also work.

paambaati commented 6 years ago

@krystianity Thanks for the tip! I tried it, and this prints messages from only 1 topic —

import { KafkaStreams } from 'kafka-streams';

const kafkaStreams = new KafkaStreams({...});

const inputTopics = ['t1', 't2'];
const inputStream = kafkaStreams.getKStream(null).from(inputTopics);

inputStream.forEach(message => console.log);

inputStream.start();

Is there more info I can provide?

krystianity commented 5 years ago

there are now 3 examples on how this works :)