caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

combineLatest #416

Open kharandziuk opened 8 years ago

kharandziuk commented 8 years ago

Does somebody plan to implement combineLatest(like in Rx)?

You can find a visualization http://rxmarbles.com/#combineLatest

kharandziuk commented 8 years ago

A naive implementation may look like this:

var combineLatest = function(...streams) {
    var _streams = streams.map(
        (s, i) => s.map((x) => {return {[i]: x}})
    )
    return H(_streams).merge().scan1(H.extend).filter((obj) => {
      return Object.keys(obj).length === streams.length
    }).map(obj =>
      _(obj).pairs().reduce(
        (res, [key, val]) => {
          res[key] = val
          return res
        },
        []
      )
    )
  }
combineLatest(users, photos).each((x) => console.log(x))

Does it make sense? Should I create a PR?

vqvu commented 8 years ago

This doesn't quite work. I think you want scan1(_.extend) instead of scan1(H.extend). Highland's version of extend has the argument order switched.

As for a PR, we're trying to move away from having a "kitchen sink" of transforms in the core library. @quarterto has been an especially strong proponent of this. I think combineLatest falls into the "useful but not common" category that makes it fall afoul of this rule. We are working on an extension mechanism (in v3.0) and a sort of "contrib" package where these types of transforms can go. No ETA though.

So I guess my answer here is not yet but possibly soontm. That said, I'd be OK with accepting this if any of the other contributors thinks we should.

kharandziuk commented 8 years ago

Hey @vqvu! Actually everything which you said makes sense for me. A lot of libraries split their functionality to the core and some extensions. But I guess they create this kind of extensions as early as possible. On the moment there is strange situation with Highland.js, It looks nice and "nodish", I want to use it, but instead of creating actual projects I'm fighting with absence of primitives and guidelines. So, please decide what Highland actually is:

I guess it's not only my problem. Quick googling returns you something like this: https://github.com/milankinen/ffux/issues/5

pajtai commented 8 years ago

Just to jump in here, we use Reactive Extensions and are trying to migrate over to Highland for Node, but we use combineLatest everywhere. Think it'd be a great addition to the "high order streams" section.

Alternatively, is there some other pattern that can be used in these situations besides combineLatest? For us "these situations" are:

I have X number of streams, I want to re-template the UI with the latest from each X of those streams if any one of them changes.

For example if there is a list of devices, a list of users, and a selected device, then combineLatest(devicesStream, selectedDeviceStream, selectedUserStream) allows a ui update if any one of those three changes - giving the latest values and allowing further stream manipulation with the results.

vqvu commented 8 years ago

You make a good point. Your updating UI example sounds like a reasonably common use case. And I don't think it's possible solve it by composing existing operators, which is a good indication that we should provide support for it. I also hadn't realized that kefir and bacon both also implement this operator (that's my fault for not reading @kharandziuk's comment more carefully).

So consider me convinced. I'm happy to accept PRs. It shouldn't be too hard to implement by using zipAll0 as a template. Or I can put something together this weekend.

kharandziuk commented 8 years ago

So, what about my implementation? @vqvu should I create PR? or Are there other issues except extend?

var combineLatest = function(...streams) {
    var _streams = streams.map(
        (s, i) => s.map((x) => {return {[i]: x}})
    )
    return H(_streams).merge()
      .scan1(
         (acc, next)=> H.extend(next, acc))
      .filter((obj) => {
        return Object.keys(obj).length === streams.length
    }).map(obj =>
      _(obj).pairs().reduce(
        (res, [key, val]) => {
          res[key] = val
          return res
        },
        []
      )
    )
  }
vqvu commented 8 years ago

Your implementation probably works, but it creates a lot of objects and streams in the process, so I am concerned about efficiency. It's also a bit roundabout, since it tries to use existing operators. I would rather it be implemented directly via pull. There's already a template you can use in the implementation of zipAll0.

I'd also like the implementation to have the signature

Stream.prototype.combineWith = function combineWith(f) {
    ...
};

Having the operator take as input a function matches the signature of the other libraries. As for the name, I just like the combineWith (bacon.js) more than combineLast (Rx) or combine (kefir.js).

If you'd like to put in the work to develop a pull-based implementation, I'm be happy to accept a PR.