fsprojects / FSharp.Control.Reactive

Extensions and wrappers for using Reactive Extensions (Rx) with F#.
http://fsprojects.github.io/FSharp.Control.Reactive
Other
284 stars 58 forks source link

Add Observable.flatMapAsync #59

Closed bordoley closed 9 years ago

bordoley commented 9 years ago

Similar to flatmapTask for f# async. Probably using Async.StartImmediate. I can provide PRs if your in agreement.

panesofglass commented 9 years ago

PRs would be very helpful! Thank you! Also, if you have the time, please add tests. We are woefully short of good test coverage.

Thank you for your interest in the project!

TheAngryByrd commented 9 years ago

We can discuss what I have done in the past.

let flatmapAsync asyncOperation (source : IObservable<'Source>) = 
        source.SelectMany(fun item -> liftAsync asyncOperation item)
let liftAsync asyncOperation =         
     asyncOperation >> fromAsync

We have two options for fromAsync

let fromAsync computation = 
    Observable.Create<'a>(Func<IObserver<'a>,Action>(fun o -> 
                              if o = null then nullArg "observer"
                              let cts = new System.Threading.CancellationTokenSource()
                              let invoked = ref 0

                              let cancelOrDispose cancel = 
                                  if System.Threading.Interlocked.CompareExchange(invoked,1,0) = 0 then 
                                      if cancel then cts.Cancel()
                                      else cts.Dispose()

                              let wrapper = 
                                  async { 
                                      try 
                                          try 
                                              let! result = computation
                                              o.OnNext(result)
                                          with e -> o.OnError(e)
                                          o.OnCompleted()
                                      finally
                                          cancelOrDispose false
                                  }

                              Async.StartImmediate(wrapper,cts.Token)
                              Action(fun () -> cancelOrDispose true)))

Look familiar @panesofglass? (http://www.fssnip.net/1H) :smiley_cat:

or possibly

let fromAsync asyncOperation = 
    Observable.FromAsync
            (fun (token : Threading.CancellationToken) -> Async.StartAsTask(asyncOperation,cancellationToken = token))

I'm not sure which version of fromAsync makes more sense but both do get you the same result.

panesofglass commented 9 years ago

@TheAngryByrd it does indeed look familiar! In this case, however, I think it better to stick to the optimizations available within Rx. I like your second option. Do you mind submitting a PR?

TheAngryByrd commented 9 years ago

Should be able to get to it this weekend.

panesofglass commented 9 years ago

I'll push this to NuGet soon. I will also update release notes.