fsprojects / FSharp.Control.Reactive

Extensions and wrappers for using Reactive Extensions (Rx) with F#.
http://fsprojects.github.io/FSharp.Control.Reactive
Other
284 stars 58 forks source link

Functions for converting between IObservable<T> and Async<T> #55

Closed bordoley closed 8 years ago

bordoley commented 9 years ago

What do you think of adding some simple functions that are analogous to those in System.Reactive.Threading.Tasks.TaskObservableExtensions. Essentially:

Observable.toAsync -> Returns an Async workflow that computes the last value of the observable sequence. Async.toObservable -> Returns a cold observable sequence that signals when the Async workflow completes.

panesofglass commented 9 years ago

Those currently exist in F#x

bordoley commented 9 years ago

There is some interesting overlap between this library and F#x's Observable functions (and also unfortunately also not a PCL :/) Is your general thought that you'd rather encourage adoption of F#x? My perspective at least is that taking on a dependency on this library is a little bit less of a commitment than taking one on F#x, and these are fairly trivial functions to implement when letting RX do the heavy lifting.

panesofglass commented 9 years ago

Could be worth removing the functions in F#x to this project.

bordoley commented 9 years ago

I have the following code locally in my forked repo. I can provide a PR if it would be useful. Any comments?

let createWithTask subscribe =
    Observable.Create(Func<IObserver<'Result>, CancellationToken, Task> subscribe)

let toObservable (comp:Async<'T>) =
    let subscribe (observer:IObserver<'T>) (ct:CancellationToken) =
        let computation = async {
            let! result = comp |> Async.Catch
            match result with
            | Choice1Of2 result -> 
                observer.OnNext(result)
            | Choice2Of2 exn -> 
                observer.OnError exn
        } 

        Async.StartAsTask(computation, cancellationToken = ct) :> Task

    createWithTask subscribe
letrec commented 9 years ago

How about propagating cancellation token to the comp, so it's given a chance to cancel upon unsubscription?

bordoley commented 9 years ago

I was under the impression that by passing the cancellationToken to StartAsTask, that F# would set the cancellation token returned by calls to Async.CancellationToken to the inner workflows. See: http://stackoverflow.com/a/27022315

Am I missing something? I haven't tested.

letrec commented 9 years ago

Could you elaborate on how comp is going to get ccess to the token if it does support cancellation?

bordoley commented 9 years ago

am I missing something?

    let comp = async {
        let! ct = Async.CancellationToken
        ct.ThrowIfCancellationRequested()
        ()
    }
letrec commented 9 years ago

Makes sense. Thanks for the clarification!

panesofglass commented 9 years ago

@bordoley that's close but not quite right. I think you want:

    let createWithTask subscribe =
        Observable.Create(Func<IObserver<'Result>, CancellationToken, Task> subscribe)

    let toObservable (comp:Async<'T>) =
        let subscribe (observer:IObserver<'T>) (ct:CancellationToken) =
            let computation = async {
                let! result = comp |> Async.Catch
                match result with
                | Choice1Of2 result ->
                    observer.OnNext(result)
                    observer.OnCompleted()
                | Choice2Of2 exn -> 
                    observer.OnError exn
            } 

            Async.StartAsTask(computation, cancellationToken = ct) :> Task

        createWithTask subscribe

I had something like this using only Async<'T> -> IObservable<'T> some time ago and removed it. I can't recall exactly why, though.

panesofglass commented 9 years ago

I think I like your version better. :) https://github.com/fsprojects/FSharp.Control.Reactive/commit/3492370011e5df03e0af500663ec4f018922d857

panesofglass commented 9 years ago

Please definitely submit a PR for createWithTask. I didn't realize that even existed.

You may be able to get away with avoiding the Task conversion in toObservable by using one of the Create overloads with Async.StartImmediate. I don't know the performance trade-offs on those options, though.

panesofglass commented 9 years ago

Also, you may want to consider adding both:

type Async with
    static member StartAsObservable(comp: Async<'T>) // Follows Async methods

module Observable =
    let ofAsync (comp: Async<'T>) = Async.StartAsObservable(comp) // matches Observable module extensions in this library
panesofglass commented 9 years ago

@TheAngryByrd, should we consider updating the new async functions you added with the above definitions for ofAsync rather than the one using Observable.FromAsync? The above look a little more straight-forward. See this article for why FromAsync may be lacking. Thoughts?

TheAngryByrd commented 9 years ago

As long as they pass the tests right? :smiley_cat:

TheAngryByrd commented 9 years ago

So I did some benchmarking of a few F# Async to Observable algorithms I've come across.

What I did is benchmark each 'Algorithm' doing a short operation (a for loop) multiple times. I do proceed to parallelize the operations to get a better idea of how it might handle when doing the flatmap operations.

Project here: https://github.com/TheAngryByrd/FsharpAsyncToObservable/

The results on my computer are

Running with 1 concurrent async

Algorithm1 ms Ellapsed : 96
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 sum ms Ellapsed : 61.400000

Algorithm2 ms Ellapsed : 233
Algorithm2 ms Ellapsed : 204
Algorithm2 ms Ellapsed : 268
Algorithm2 ms Ellapsed : 267
Algorithm2 ms Ellapsed : 261
Algorithm2 ms Ellapsed : 264
Algorithm2 ms Ellapsed : 274
Algorithm2 ms Ellapsed : 254
Algorithm2 ms Ellapsed : 259
Algorithm2 ms Ellapsed : 265
Algorithm2 sum ms Ellapsed : 254.900000

Algorithm3 ms Ellapsed : 142
Algorithm3 ms Ellapsed : 126
Algorithm3 ms Ellapsed : 127
Algorithm3 ms Ellapsed : 130
Algorithm3 ms Ellapsed : 129
Algorithm3 ms Ellapsed : 125
Algorithm3 ms Ellapsed : 128
Algorithm3 ms Ellapsed : 129
Algorithm3 ms Ellapsed : 132
Algorithm3 ms Ellapsed : 128
Algorithm3 sum ms Ellapsed : 129.600000

Algorithm4 ms Ellapsed : 246
Algorithm4 ms Ellapsed : 244
Algorithm4 ms Ellapsed : 241
Algorithm4 ms Ellapsed : 247
Algorithm4 ms Ellapsed : 239
Algorithm4 ms Ellapsed : 247
Algorithm4 ms Ellapsed : 246
Algorithm4 ms Ellapsed : 245
Algorithm4 ms Ellapsed : 247
Algorithm4 ms Ellapsed : 255
Algorithm4 sum ms Ellapsed : 245.700000

Running with 2 concurrent async

Algorithm1 ms Ellapsed : 61
Algorithm1 ms Ellapsed : 60
Algorithm1 ms Ellapsed : 60
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 62
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 58
Algorithm1 sum ms Ellapsed : 59.600000

Algorithm2 ms Ellapsed : 241
Algorithm2 ms Ellapsed : 240
Algorithm2 ms Ellapsed : 240
Algorithm2 ms Ellapsed : 241
Algorithm2 ms Ellapsed : 242
Algorithm2 ms Ellapsed : 243
Algorithm2 ms Ellapsed : 233
Algorithm2 ms Ellapsed : 240
Algorithm2 ms Ellapsed : 237
Algorithm2 ms Ellapsed : 234
Algorithm2 sum ms Ellapsed : 239.100000

Algorithm3 ms Ellapsed : 94
Algorithm3 ms Ellapsed : 94
Algorithm3 ms Ellapsed : 91
Algorithm3 ms Ellapsed : 96
Algorithm3 ms Ellapsed : 90
Algorithm3 ms Ellapsed : 90
Algorithm3 ms Ellapsed : 92
Algorithm3 ms Ellapsed : 89
Algorithm3 ms Ellapsed : 95
Algorithm3 ms Ellapsed : 90
Algorithm3 sum ms Ellapsed : 92.100000

Algorithm4 ms Ellapsed : 222
Algorithm4 ms Ellapsed : 219
Algorithm4 ms Ellapsed : 224
Algorithm4 ms Ellapsed : 226
Algorithm4 ms Ellapsed : 221
Algorithm4 ms Ellapsed : 225
Algorithm4 ms Ellapsed : 228
Algorithm4 ms Ellapsed : 223
Algorithm4 ms Ellapsed : 229
Algorithm4 ms Ellapsed : 222
Algorithm4 sum ms Ellapsed : 223.900000

Running with 3 concurrent async

Algorithm1 ms Ellapsed : 62
Algorithm1 ms Ellapsed : 60
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 60
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 60
Algorithm1 sum ms Ellapsed : 59.400000

Algorithm2 ms Ellapsed : 208
Algorithm2 ms Ellapsed : 208
Algorithm2 ms Ellapsed : 211
Algorithm2 ms Ellapsed : 212
Algorithm2 ms Ellapsed : 207
Algorithm2 ms Ellapsed : 210
Algorithm2 ms Ellapsed : 211
Algorithm2 ms Ellapsed : 202
Algorithm2 ms Ellapsed : 204
Algorithm2 ms Ellapsed : 205
Algorithm2 sum ms Ellapsed : 207.800000

Algorithm3 ms Ellapsed : 59
Algorithm3 ms Ellapsed : 58
Algorithm3 ms Ellapsed : 57
Algorithm3 ms Ellapsed : 53
Algorithm3 ms Ellapsed : 59
Algorithm3 ms Ellapsed : 59
Algorithm3 ms Ellapsed : 61
Algorithm3 ms Ellapsed : 65
Algorithm3 ms Ellapsed : 58
Algorithm3 ms Ellapsed : 60
Algorithm3 sum ms Ellapsed : 58.900000

Algorithm4 ms Ellapsed : 201
Algorithm4 ms Ellapsed : 200
Algorithm4 ms Ellapsed : 200
Algorithm4 ms Ellapsed : 198
Algorithm4 ms Ellapsed : 202
Algorithm4 ms Ellapsed : 200
Algorithm4 ms Ellapsed : 199
Algorithm4 ms Ellapsed : 198
Algorithm4 ms Ellapsed : 200
Algorithm4 ms Ellapsed : 197
Algorithm4 sum ms Ellapsed : 199.500000

Running with 4 concurrent async

Algorithm1 ms Ellapsed : 61
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 60
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 59
Algorithm1 sum ms Ellapsed : 59.300000

Algorithm2 ms Ellapsed : 181
Algorithm2 ms Ellapsed : 179
Algorithm2 ms Ellapsed : 182
Algorithm2 ms Ellapsed : 180
Algorithm2 ms Ellapsed : 182
Algorithm2 ms Ellapsed : 183
Algorithm2 ms Ellapsed : 178
Algorithm2 ms Ellapsed : 182
Algorithm2 ms Ellapsed : 181
Algorithm2 ms Ellapsed : 177
Algorithm2 sum ms Ellapsed : 180.500000

Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 49
Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 49
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 47
Algorithm3 sum ms Ellapsed : 48.800000

Algorithm4 ms Ellapsed : 179
Algorithm4 ms Ellapsed : 179
Algorithm4 ms Ellapsed : 180
Algorithm4 ms Ellapsed : 187
Algorithm4 ms Ellapsed : 180
Algorithm4 ms Ellapsed : 180
Algorithm4 ms Ellapsed : 183
Algorithm4 ms Ellapsed : 178
Algorithm4 ms Ellapsed : 179
Algorithm4 ms Ellapsed : 178
Algorithm4 sum ms Ellapsed : 180.300000

Running with 5 concurrent async

Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 57
Algorithm1 sum ms Ellapsed : 56.800000

Algorithm2 ms Ellapsed : 168
Algorithm2 ms Ellapsed : 164
Algorithm2 ms Ellapsed : 160
Algorithm2 ms Ellapsed : 162
Algorithm2 ms Ellapsed : 161
Algorithm2 ms Ellapsed : 160
Algorithm2 ms Ellapsed : 166
Algorithm2 ms Ellapsed : 167
Algorithm2 ms Ellapsed : 165
Algorithm2 ms Ellapsed : 166
Algorithm2 sum ms Ellapsed : 163.900000

Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 49
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 51
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 49
Algorithm3 sum ms Ellapsed : 48.700000

Algorithm4 ms Ellapsed : 162
Algorithm4 ms Ellapsed : 160
Algorithm4 ms Ellapsed : 162
Algorithm4 ms Ellapsed : 155
Algorithm4 ms Ellapsed : 159
Algorithm4 ms Ellapsed : 158
Algorithm4 ms Ellapsed : 163
Algorithm4 ms Ellapsed : 161
Algorithm4 ms Ellapsed : 166
Algorithm4 ms Ellapsed : 161
Algorithm4 sum ms Ellapsed : 160.700000

Running with 6 concurrent async

Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 sum ms Ellapsed : 56.300000

Algorithm2 ms Ellapsed : 146
Algorithm2 ms Ellapsed : 148
Algorithm2 ms Ellapsed : 142
Algorithm2 ms Ellapsed : 154
Algorithm2 ms Ellapsed : 159
Algorithm2 ms Ellapsed : 154
Algorithm2 ms Ellapsed : 147
Algorithm2 ms Ellapsed : 147
Algorithm2 ms Ellapsed : 144
Algorithm2 ms Ellapsed : 152
Algorithm2 sum ms Ellapsed : 149.300000

Algorithm3 ms Ellapsed : 51
Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 56
Algorithm3 ms Ellapsed : 52
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 52
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 52
Algorithm3 ms Ellapsed : 50
Algorithm3 sum ms Ellapsed : 50.500000

Algorithm4 ms Ellapsed : 153
Algorithm4 ms Ellapsed : 157
Algorithm4 ms Ellapsed : 152
Algorithm4 ms Ellapsed : 168
Algorithm4 ms Ellapsed : 155
Algorithm4 ms Ellapsed : 155
Algorithm4 ms Ellapsed : 153
Algorithm4 ms Ellapsed : 152
Algorithm4 ms Ellapsed : 154
Algorithm4 ms Ellapsed : 154
Algorithm4 sum ms Ellapsed : 155.300000

Running with 7 concurrent async

Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 55
Algorithm1 ms Ellapsed : 56
Algorithm1 ms Ellapsed : 58
Algorithm1 sum ms Ellapsed : 56.700000

Algorithm2 ms Ellapsed : 132
Algorithm2 ms Ellapsed : 124
Algorithm2 ms Ellapsed : 126
Algorithm2 ms Ellapsed : 125
Algorithm2 ms Ellapsed : 127
Algorithm2 ms Ellapsed : 124
Algorithm2 ms Ellapsed : 126
Algorithm2 ms Ellapsed : 130
Algorithm2 ms Ellapsed : 129
Algorithm2 ms Ellapsed : 127
Algorithm2 sum ms Ellapsed : 127.000000

Algorithm3 ms Ellapsed : 50
Algorithm3 ms Ellapsed : 52
Algorithm3 ms Ellapsed : 49
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 46
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 47
Algorithm3 sum ms Ellapsed : 48.200000

Algorithm4 ms Ellapsed : 149
Algorithm4 ms Ellapsed : 147
Algorithm4 ms Ellapsed : 146
Algorithm4 ms Ellapsed : 151
Algorithm4 ms Ellapsed : 153
Algorithm4 ms Ellapsed : 147
Algorithm4 ms Ellapsed : 151
Algorithm4 ms Ellapsed : 146
Algorithm4 ms Ellapsed : 156
Algorithm4 ms Ellapsed : 146
Algorithm4 sum ms Ellapsed : 149.200000

Running with 8 concurrent async

Algorithm1 ms Ellapsed : 61
Algorithm1 ms Ellapsed : 62
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 61
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 59
Algorithm1 ms Ellapsed : 57
Algorithm1 ms Ellapsed : 58
Algorithm1 ms Ellapsed : 57
Algorithm1 sum ms Ellapsed : 59.000000

Algorithm2 ms Ellapsed : 119
Algorithm2 ms Ellapsed : 119
Algorithm2 ms Ellapsed : 119
Algorithm2 ms Ellapsed : 119
Algorithm2 ms Ellapsed : 122
Algorithm2 ms Ellapsed : 116
Algorithm2 ms Ellapsed : 127
Algorithm2 ms Ellapsed : 117
Algorithm2 ms Ellapsed : 122
Algorithm2 ms Ellapsed : 119
Algorithm2 sum ms Ellapsed : 119.900000

Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 46
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 49
Algorithm3 ms Ellapsed : 47
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 48
Algorithm3 ms Ellapsed : 48
Algorithm3 sum ms Ellapsed : 47.500000

Algorithm4 ms Ellapsed : 157
Algorithm4 ms Ellapsed : 150
Algorithm4 ms Ellapsed : 145
Algorithm4 ms Ellapsed : 151
Algorithm4 ms Ellapsed : 147
Algorithm4 ms Ellapsed : 152
Algorithm4 ms Ellapsed : 148
Algorithm4 ms Ellapsed : 150
Algorithm4 ms Ellapsed : 144
Algorithm4 ms Ellapsed : 148
Algorithm4 sum ms Ellapsed : 149.200000

What seems to be happening with algorithm number 1, is it is ignoring the merge limit and using many cores all the time.

Algorithm number 3 (your implementation @panesofglass) is faster than Algorithm 2 or 4 and much so when parallelized. It definitely seems that async subject slows things down a lot.

I'm sure I've done something wrong or misjudged something, so code review please? :smirk_cat:

bordoley commented 9 years ago

I haven't dived deep into your code or the tests, but I'm conjecturing that the differences being observed are more likely than not being caused by thread hops and context switches.

When choosing an implementation, we should probably determine what the semantics of the method will be when the returned IObservable is subscribed to. In general, I would argue that RX is about asynchronous programming and not concurrency at its core, and the correct behavior is to synchronously execute the async workflow. The async workflow may or may not include asynchronous code, and asynchronous code may be either IO or compute bound, but this logic should be isolated into the implementation of the provided workflow. This is actually a good reason to avoid Async.startAsTask, so my implementation above is faulty.

We may also want to provide a variant of this function that accepts a scheduler to schedule the execution of the workflow on either the task pool, an eventloop, etc.

bordoley commented 9 years ago

Also looking at your code and given my comments above, I tend to like algorithm 1 most stylistically, though I would use Observable.Create instead of implementing the IObservable interface and return an action to cancel and dispose the token.

Also might be nice to add Observable.create a:(obs -> () -> ()) = ...

panesofglass commented 9 years ago

@TheAngryByrd, thanks for doing that! I agree with @bordoley on both style and use of Observable.Create. 1 and 3 look fairly similar; I now wonder why I used Async.Start rather than Async.StartWithContinuations. I think it was to not kick it off on the current thread, though I think @bordoley is correct that the Async probably should be run immediately. Also, my algorithm probably pays a penalty for the built-up computation expression with Delay, TryFinally, TryWith, etc. all used.

panesofglass commented 8 years ago

Closing due to age. Please re-open if desired, especially with a PR, if possible.

hesxenon commented 1 year ago

as far as I can tell this covers Async -> Observable, but what about the other direction? How can I convert an Observable to an Async? Something similar to firstValueFrom (from rxjs)?

drhumlen commented 6 months ago

@hesxenon Did you find a solution to it?

Something to go from IObservable -> Async would be very useful. Perhaps two function? One .firstAsync: IObservable<'a> -> Async<'a> and .allAsync: IObservable<'a> -> Async<'a list>