fsprojects / FSharp.Control.Reactive

Extensions and wrappers for using Reactive Extensions (Rx) with F#.
http://fsprojects.github.io/FSharp.Control.Reactive
Other
284 stars 60 forks source link

Replace custom IObservable implementations with Rx native #152

Closed deviousasti closed 4 years ago

deviousasti commented 4 years ago

This PR replaces custom implementations using object expressions with Rx native methods.

Rationale

Rx provides certain guarantees of behavior such as serialization of notifications through scheduling, safeguarding observers, and automatic pipeline tear-down. These are implemented in Rx.Net as a set of Producers and Sinks, and they have non-trivial implementations, even for methods which produce a single notification.

It is difficult to correctly implement the IObservable interface, and building up a solution with reactive combinators is always recommended over rolling your own.

Issues

This might be a breaking change for code which relied on this incorrect behavior.

Two tests fail, both of these rely on Observable.ofSeq which has simply been changed to:

    let ofSeq<'Item>(source:'Item seq) : IObservable<'Item> =
        Observable.ToObservable source

equivalent to enumerable.ToObservable().

Should this be considered a version incompatibility?

deviousasti commented 4 years ago

One test is this:

``throwing an exception in choose leads to the OnError event firing and does not lead to the exception flowing out even with a regular subscribe`` () =
    let o = Observable.ofSeq [1;2;3] |> Observable.choose (fun _ -> failwith "qwe")
    let error_flows_out = ref false
    try o |> Observable.subscribe (printfn "%i") |> ignore //<--- this should always throw an error
    with _ -> error_flows_out := true
    Assert.That(!error_flows_out, Is.EqualTo false)
    o |> ``should be`` 0 true false

If there's an OnError notification, the simple subscribe overload will always throw, contrary to what this test states.

let s = Observable.throw (exn "Oh no")

s |> Observable.subscribe (printfn "%A") // will throw
s |> Observable.subscribeWithError (printfn "%A") (printfn "%A") // won't throw    

This is expected behavior.

deviousasti commented 4 years ago

I debugged the other test:

    member __.``exhaustMap maps all incoming source emits`` () =
        Check.QuickThrowOnFailure <|
        fun (xs : int list) (f : int -> int) ->
        TestSchedule.usage <| fun sch ->
            let inner  = xs |> List.map f
            let got = 
                Observable.ofSeq xs
                |> Observable.exhaustMap (fun _ -> Observable.ofSeq inner)
                |> TestSchedule.subscribeTestObserverStart sch
                |> TestObserver.nexts 
            let expected = List.collect (fun _ -> inner) xs

            expected = got

According to the exhaustMap summary,

However, exhaustMap ignores every new projected Observable if the previous projected Observable has not yet completed. Once that one completes, it will accept and flatten the next projected Observable and repeat this process.

This is actually not being reflected in the test. It should ignore the following values of the inner observable while the first one is running. This didn't happen in the previous implementation because it was completely synchronous.

Addendum:

exhaustMap's defintion doesn't look thread-safe. It's using a mutable variable, and manually disposing when ideally it should be using SerialDisposable. Maybe in another PR.

deviousasti commented 4 years ago

Shall I push the fixed tests?