SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

Integration with RxJS? #208

Open jcastill0 opened 9 years ago

jcastill0 commented 9 years ago

Has there been any thought on integrating the consumer library with a reactive stream?

Just curious if anybody has done any work on this.

Regards

KyleAMathews commented 9 years ago

It'd be trivial to wrap the onMessage function with RxJS. E.g.

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromcallback.md

SomeKittens commented 8 years ago

We use Gustav to manage our Kafka/Rx integrations:

  gustav.source('kafkaSource', config => {
    let client = new kafka.Client(globalConfig.kafkaString);
    let consumer = new kafka.Consumer(client, [{
      topic: config.topic,
      partition: config.partition,
      offset: config.offset
    }], /* TODO */ { autoCommit: false, fromOffset: true });

    return new Rx.Observable(o => {
      consumer.on('message', m => o.next(m.value));
      consumer.on('error', err => o.error(err));

      return () => consumer.close(() => {});
    });
  });

The plan is to introduce a Kafka plugin once Gustav hits 1.0 (no date as of yet, as the feature set is still settling)