ghermeto / kafka-observable

Observables from kafka interaction
MIT License
18 stars 5 forks source link

consider extending Observable? #1

Open trxcllnt opened 7 years ago

trxcllnt commented 7 years ago

RxJS 5 supports extending Observables and adding custom operators. I think custom Observables are pretty neat, and they're generally faster than the Observable.create style. Best of all, by overriding lift to return your custom Observable subtype, your type will be preserved through operator chaining!

KafkaObservable.fromTopic(...)
  .filter(Boolean)
  .kafkaMessage() // <-- these work
  .flatMap(() => ...)
  .otherKafkaThing() // <-- just fine!
  .subscribe()

For example, the kafka-message operator can be implemented like this:

const { Observable, Subscriber } = require('rxjs');

// Extend Observable, and override `lift` to return instances of KafkaObservable
// All the Rx operators all call `this.lift(some_operator)` just like I do in `kafkaMessage`
export class KafkaObservable extends Observable {
  lift(operator) {
    const kObs = new KafkaObservable();
    kObs.source = this;
    kObs.operator = operator;
    return kObs;
  }
  kafkaMessage(mapper = x => x) {
    return this.lift(function(source) {
      // the `this` context is the destination Subscriber
      const sink = this;
      return source.subscribe(new KafkaMessageSubscriber(sink, mapper));
    });
  }
}

class KafkaMessageSubscriber extends Subscriber {
  constructor(sink, mapper) {
    super(sink);
    this.mapper = mapper;
  }
  next(x) {
    super.next(this.mapper(x.message.toString('utf8'));
  }
}

You don't have to worry about things like forwarding errors, completion, or disposal. Because this is how we do things inside the Rx library, you get all the same stuff for free just by extending Subscriber! Here's a few examples I've done if you're interested in seeing more:

ghermeto commented 7 years ago

Hi @trxcllnt

my original idea was to extend Observable, but I got discouraged by this recommendation.

However, I agree that it is probably the right way to go. I will check your libraries and make the updates in the next few days...

trxcllnt commented 7 years ago

@ghermeto yeah, I should probably PR an update to the docs about extending Observable. I also wrote about lift in this comment recently, which includes some interesting links.

At the end of the day, let, lift, and pipe are three ways to do mostly the same thing. I prefer lift because it's usually less code, and is easier for the JIT to optimize. You can also compose operators externally with lift just like you can with let, except the operator interface is implemented in terms of Observers instead of Observables.

The one drawback to this approach is that calling the static Observable creation methods on Observable subclasses doesn't return an instance of your subclass by default.

It's easy to workaround if your subclass implements a slightly more intelligent constructor, and you don't mind wrapping the static creation methods to return instances of your subclass, but we should probably have a mechanism in Rx to do this for you.

Let me know if I can answer any questions along the way. Cheers!