fsprojects / FSharp.Control.Reactive

Extensions and wrappers for using Reactive Extensions (Rx) with F#.
http://fsprojects.github.io/FSharp.Control.Reactive
Other
284 stars 58 forks source link

Source/Sink Proposal #104

Closed stijnmoreels closed 6 years ago

stijnmoreels commented 6 years ago

Hi!

Could it be useful to add some "Source/Sinks" to this repository? I was think of maybe something like: Console, File, Http, ...

Maybe this is a good extension to some basic observers we need to write? Just an idea.

This was something I worked on:

namespace FSharp.Control.Reactive

open System.IO
open System.Reactive
open System.Reactive.Linq
open System
open System

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]  
module FileIO =

    let private fileWatch path ext () =
        let w = new FileSystemWatcher (path, ext)
        w.EnableRaisingEvents <- true
        w.IncludeSubdirectories <- true
        w

    let private fileEvent f path ext =
        Observable.using
            (fileWatch path ext)
            (fun w -> f w :> IObservable<'a>)

    ///**Description**
    /// `IObservable<_>` that emits values when a file is created.
    ///**Parameters**
    ///  * `path` - Path to listen for files.
    ///  * `ext` - Extension that the files must have.
    let created = fileEvent (fun w -> w.Created)

    ///**Description**
    /// `IObservable<_>` that emits values when a file is modified.
    ///**Parameters**
    ///  * `path` - Path to listen for files.
    ///  * `ext` - Extension that the files must have.
    let modified = fileEvent (fun w -> w.Changed)

    ///**Description**
    /// `IObservable<_>` that emits values when a file is deleted.
    ///**Parameters**
    ///  * `path` - Path to listen for files.
    ///  * `ext` - Extension that the files must have.
    let deleted = fileEvent (fun w -> w.Deleted)

    ///**Description**
    /// `IObservable<_>` that emits values when a file is renamed.
    ///**Parameters**
    ///  * `path` - Path to listen for files.
    ///  * `ext` - Extension that the files must have.
    let renamed = fileEvent (fun w -> w.Renamed)

    let private guid = (fun () -> Guid.NewGuid().ToString()) ()

     ///**Description**
    /// `IObserver<_>` that sends the 'OnNext' and 'OnError' calls to a file for which a name is looked up, using a custom `FileStream`.
    ///**Parameters**
    ///  * `dir` - Directory to where the observer must place the files.
    ///  * `nameOf` - Function to find out the name the file must have.
    ///  * `ffs` - Function to create a `FileStream` instance based on a given path.
    let sinkNamedStream dir nameOf ffs =
        let writeNamed name (v : 'a) =
            let path = Path.Combine (dir, name)
            use fs = ffs path
            use w = new StreamWriter (fs : FileStream)
            w.Write v

        Observer.Create (
            (fun x -> writeNamed (nameOf x) x), 
            (fun (ex : exn) -> ex |> writeNamed guid))

     ///**Description**
    /// `IObserver<_>` that sends the 'OnNext' and 'OnError' calls to a file for which a name is looked-up.
    ///**Parameters**
    ///  * `dir` - Directory to where the observer must place the files.
    ///  * `nameOf` - Function to find out the name the file must have.
    let sinkNamed dir nameOf =
        sinkNamedStream dir nameOf (fun path -> 
            new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read))

    ///**Description**
    /// `IObserver<_>` that sends the 'OnNext' and 'OnError' calls to a generated file.
    ///**Parameters**
    ///  * `dir` - Directory to where the observer must place the files.
    let sink dir = sinkNamed dir (fun _ -> guid)
panesofglass commented 6 years ago

I would be happy to see this added as an additional library. Feel free to submit a PR. Are you thinking library-per-source/sink types/pairs?

stijnmoreels commented 6 years ago

IDK yet...