ReactiveX / RxRuby

Reactive Extensions for Ruby
Other
960 stars 75 forks source link

Implement the ReactiveX Sample operator #98

Open schmich opened 7 years ago

schmich commented 7 years ago

RxRuby should implement the ReactiveX Sample operator.

I've found Sample in the Rx.NET library useful in my .NET projects and I find myself needing it in my current Ruby project. You can see the Rx.NET implementation here.

I've hacked up something that seems to have the behavior I'm looking for, but it is very untested and probably has some leaks and incorrect corner cases:

module Rx::Observable
  def sample(sampler)
    sample = sampler.map { |e| [e, true] }
    merged = Rx::Observable.merge(self.map { |e| [e, false] }, sample)
    merged.zip(merged.skip(1)).select { |l, r| r[1] }.map { |l, r| l[0] }
  end
end

I looked into implementing Sample myself natively in the library and contributing a pull request, but after looking at the source for some other operators, I think it would take far too much time for me to contribute a correct implementation.

bittrance commented 7 years ago

Here is a start for you: https://github.com/bittrance/rxruby/commit/b6db90887e349347ead3ff8fc736a65f14ae03b4. Needs better error handling, proper synchronization and a force-push with a test to make it look like I am TDD. 😈 I'll try to continue tomorrow; right now I need to go to bed.

schmich commented 7 years ago

Hi @bittrance, thanks for your help here.

I tried using your WIP implementation, but it doesn't work as I would expect on the test below. I could be mistaken in what the behavior should be. Rx can be difficult to reason about sometimes.

require 'rx'

e = Rx::Subject.new
d = Rx::Subject.new
s = Rx::Subject.new

enable = e.as_observable
disable = d.as_observable
source = s.as_observable

stream = enable.map { true }
        .merge(disable.map { false })
        .combine_latest(source) { |*pair| pair }
        .sample(source)
        .select(&:first)
        .map(&:last)

stream.subscribe do |x|
  puts "Source: #{x}"
end

e.on_next(1)
d.on_next(1)
e.on_next(1)
s.on_next(100)
d.on_next(1)
s.on_next(200)
e.on_next(1)
s.on_next(300)
s.on_next(400)
e.on_next(1)
d.on_next(1)
s.on_next(500)
s.on_next(600)

I would expect the output to be:

Source: 100
Source: 300
Source: 400

In the test above and in my use case, I have 3 streams of events: an "enable" stream, a "disable" stream, and a "source" stream. I want to use the enable/disable streams as a switch/valve/gate to enable and disable the source stream.

In other words, when I receive data on the source stream, if I last saw a disable event, the data should be ignored and dropped. If I last saw an enable event, the data should be emitted. In this way, the enable/disable merged stream acts as a gate for the source stream.

I realize this could be done with a some local variables and mutable state, but I'd like to keep it as functional and Rx-only as possible. The Sample operator seemed like an easy way to do it. I'm open to other implementations using other operators, too (maybe Switch or FlatMap).

Update: After thinking about this more, ultimately, I really just want a gate operator:

# source is the Observable emitting the data I care about
# switch is an Observable emitting true/false that controls source emission
source.gate(switch).subscribe do |x|
  puts "Source: #{x}"
end

I've come up with the following, but since AnonymousObservable and CompositeSubscription aren't well documented, it's tough to know if it's correct:

module Rx::Observable
  def gate(gate, initial: false)
    Rx::AnonymousObservable.new do |obs|
      enabled = initial
      gate_sub = gate.subscribe(
        -> enable { enabled = enable },
        obs.method(:on_error),
        -> { }
      )
      self_sub = self.select { enabled }.subscribe(
        obs.method(:on_next),
        obs.method(:on_error),
        obs.method(:on_completed)
      )
      Rx::CompositeSubscription.new [gate_sub, self_sub]
    end
  end
end
bittrance commented 7 years ago

On take 1, I am not sure how you see that. We are mostly trying to follow RxJS, so I copied https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/sample.md to ./examples/sample.rb. In this implementation, .sample() is called on the source, and takes a "strobe" observable whose emissions "reset" the sampling. Note also that in your first take, there is no guarantee that .combine_latest() and .sample() will get events in the desired order.

On your update, the .gate() operator will introduce a concept of emission "truthiness" that ReactiveX implementation does not to my knowledge have. I would be more comfortable with select taking a stream of predicates such that you could do:

p = Rx::Subject.new
s = Rx::Subject.new
source = s.as_observable
source
  .filter(p, lambda {|_| false}) # lambda is initial predicate
  .subscribe { |e| puts e }
s.on_next('foo') # not emitted
p.on_next(lambda { |_| true })
s.on_next('bar') # => 'bar'
s.on_next('baz') # => 'baz'
p.on_next(lambda {|_| false })
s.on_next('bar') # not emitted

Or perhaps this should in fact be called .gate() rather than .filter()? I cannot find any other implementation that has something like this, but nor can I express it reasonably using other operators.

schmich commented 7 years ago

On take 1, I am not sure how you see that.

I ported my test case above to RxJS and I do see the behavior I expected above:

RxJS test case ```js let e = new Rx.Subject(); let d = new Rx.Subject(); let s = new Rx.Subject(); let enable = e.asObservable(); let disable = d.asObservable(); let source = s.asObservable(); let stream = enable.map(_ => true) .merge(disable.map(_ => false)) .combineLatest(source) .sample(source) .where(pair => pair[0]) .map(pair => pair[1]); stream.subscribe(x => console.log(`Source: ${x}`)); e.onNext(1) d.onNext(1) e.onNext(1) s.onNext(100) d.onNext(1) s.onNext(200) e.onNext(1) s.onNext(300) s.onNext(400) e.onNext(1) d.onNext(1) s.onNext(500) s.onNext(600) // Output: // Source: 100 // Source: 300 // Source: 400 ```


Note also that in your first take, there is no guarantee that .combine_latest() and .sample() will get events in the desired order.

You're right, thanks! I'm dealing with slower, discrete, ordered events as opposed to frequent, async events, so I completely missed that.

I would be more comfortable with select taking a stream of predicates such that you could do:

I'm interested in why you'd prefer predicates over just true/false values. Would the gate/filter observable then be a lazily-evaluated stream of predicates that are evaluated every time when the source emits a value?

I didn't intend for this issue to become general discussion about my problem. With your help and after thinking a bit deeper, I think I have a workable solution in the form of gate/filter. If you'd like to keep this issue open and implement sample, I believe it's unique and would benefit the RxRuby library, but at this point, I don't currently need it.

bittrance commented 7 years ago

I meditated and changed my mind. I no longer think a stream of predicates is a good idea because there is no way you can "inspect" or transform a Ruby lambda without executing it, possibly incurring unknown side effects and so they don't really adhere to the Rx spirit.

Instead I think your use case and many other could be handled elegantly if this was possible:

strobe = Rx::Observable.interval(1)
events
  .sample(strobe) { |latest, pulse| latest if pulse }
  .subscribe { |event| ... }

One use could be to have strobe emit predicates, but you could equally do booleans like this example. It would become sort of a "combine_latest_left" which I have actually been missing. What say you?

bittrance commented 7 years ago

PR up for basic function #102 .

schmich commented 7 years ago

It would become sort of a "combine_latest_left" which I have actually been missing. What say you?

I looked at CombineLatestLeft in Rx.NET and its analog withLatestFrom in RxJS, and I think it's exactly what I've been missing as well!

Porting the Rx.NET implementation to Ruby, I end up with something (admittedly ugly) like:

module Rx::Observable
  def with_latest_from(other, &combine)
    combine ||= ->(*pair) { pair }

    lefts = self.map { |e| { is_left: true, left: e, right: nil } }
    rights = other.map { |e| { is_left: false, left: nil, right: e } }

    lefts.merge(rights)
      .scan({ is_left: false, left: nil, right: nil }) { |o, n|
        n[:is_left] ?
        { is_left: true, left: n[:left], right: o[:right] } :
        { is_left: false, left: nil, right: n[:right] }
      }.select { |e|
        e[:is_left]
      }.map { |e|
        combine.call(e[:left], e[:right])
      }
  end
end

With this, implementing gate above is as simple as:

module Rx::Observable
  def gate(gate)
    self.with_latest_from(gate).select(&:last).map(&:first)
  end
end

And I've already run into another scenario where I was able to apply it (outside of the gate pattern).

Calling it with_latest_from makes more intuitive sense to me than combine_latest_left, but this is definitely a basic operator that I've been missing. In my opinion, it deserves a place in RxRuby, much like withLatestFrom in RxJS.

bittrance commented 7 years ago

We appear to be in agreement. In fact, I think .with_latest_from() is more common/useful than .sample(). The typical use case for .with_latest_from() would be where you have a dynamic rule-set or configuration: you want to process each item in a stream according to the latest available rules/configs.

I would like some feedback from the core developers on my .sample() implementation before I go further, but if if is appreciated, I will certainly implement .with_latest_from() in a similar fashion.

Thank you for your input.