louthy / language-ext

C# functional language extensions - a base class library for functional programming
MIT License
6.52k stars 420 forks source link

Option<T> with async/await #25

Closed ricardopieper closed 9 years ago

ricardopieper commented 9 years ago

Hello! First of all, great work in this library.

I had some problems when I used async/await with options. I managed to solve it, but I don't feel confortable enough to make a pull request, so here is the idea.

I'm using ASP.NET MVC, and I have an action that can return a Redirect result or a View result. This result would be returned from inside a lambda function passed to the Match function. Inside this lambda, I also made some async work. I needed async/await only in the Some handler.

In short, I had some type errors, the compiler thought that the lambda should return void (probably was using the Action overloads of the Match function). Also, the code got very ugly because sometimes I had to return await Task.FromResult(actionResult) to make it work.

So I added these extension methods:

public static async Task<R> Match<T, R>(this Option<T> self, Func<T, Task<R>> Some, Func<R> None)
       => await self.Match(Some: async (t) => await Some(t), None: async () => await Task.FromResult(None()));

public static async Task<R> Match<T, R>(this Option<T> self, Func<T, Task<R>> Some, Func<Task<R>> None)
       => await self.Match(Some: async (t) => await Some(t), None: async () => await None());

Now my code looks a little bit better, and it also compiles. I haven't tested it yet, but seems like those extra extension methods could at least make things simpler, because if your Some handler needs to await something but your None doesn't, with those functions you don't need to make the None handler async.

louthy commented 9 years ago

Hi @ricardopieper

First of all, great work in this library.

Thanks... It's always nice to hear somebody's getting some use from it! :)

I have added some extra match functions for Option<T> and OptionUnsafe<T> (I'll put others in when I get some time). As well as tasks I have put in match functions for IObservable<T>. Unfortunately because the type signatures conflict with the existing match functions, I had to call them MatchAsync. I really don't like the Async suffix, but I don't really have many options.

So, there's:

    // This is your first request
    Task<R> MatchAsync<R>(Func<T, Task<R>> Some, Func<R> None);

    // This is your second
    Task<R> MatchAsync<R>(Func<T, Task<R>> Some, Func<Task<R>> None);

    // Same as your first request, but with observable
    IObservable<R> MatchAsync<R>(Func<T, IObservable<R>> Some, Func<R> None);

    // Same as your second request, but with observable
    IObservable<R> MatchAsync<R>(Func<T, IObservable<R>> Some, Func<IObservable<R>> None);

Then there are some extension methods to Option<T> and OptionUnsafe<T>

    // So instead of an Option<T>, this deals with an Optional Task<T>
    // It runs the task asynchronously (if Some) and returns the result in a new Task<R>
    Task<R> MatchAsync<T, R>(this Option<Task<T>> self, Func<T, R> Some, Func<R> None);

    // This is the same as above, but for observable
    IObservable<R> MatchAsync<T, R>(this Option<IObservable<T>> self, Func<T, R> Some, Func<R> None);

The next one is a fun one. It takes a stream of options, and matches them into a new stream


    IObservable<R> MatchAsync<T, R>(this IObservable<Option<T>> self, Func<T, R> Some, Func<R> None)

Also, if you're doing any async work in general, you may want to take a look at the LanguageExt.Process project. It's an 'Erlang like' process system (actors). Each process you spawn is very lightweight, you can spawn 10s of 1000s of them no problem. They each have an inbox where they process one message at a time, so perfect for designing concurrent systems. I have plugged it into Redis, so you can do fast inter-system communication, it has atomic message queues, state persistence, etc. It's still alpha, so be careful, but you may find something interesting in there. There are 4 samples:

ProcessSample - spawns 3 processes, one logger, one that sends a 'ping' message and one that sends a 'pong' message. They schedule the delivery of messages every 100 ms. The logger is simply: Console.WriteLine:

            // Log process
            var logger = spawn<string>("logger", Console.WriteLine);

            // Ping process
            ping = spawn<string>("ping", msg =>
            {
                tell(logger, msg);
                tell(pong, "ping", TimeSpan.FromMilliseconds(100));
            });

            // Pong process
            pong = spawn<string>("pong", msg =>
            {
                tell(logger, msg);
                tell(ping, "pong", TimeSpan.FromMilliseconds(100));
            });

            // Trigger
            tell(pong, "start");

Then there's RedisInboxSample. It sends 100 messages to a process, the process's inbox is persisted to Redis. The inbox intentionally reads them slowly, so you can quit the app, re-open, and it carries on reading:

        static void Main(string[] args)
        {
            // Connect to the Redis cluster
            RedisCluster.register();
            Cluster.connect("redis", "redis-test", "localhost", "0");

            // Spawn the process
            var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistInbox);

            var rnd = new Random();
            for (var i = 0; i < 100; i++)
            {
                tell(pid, "Message sent: " + DateTime.Now + " " + DateTime.Now.Ticks + " " + rnd.Next());
            }
        }

        //  Inbox message handleer
        static void Inbox(string msg)
        {
            Console.WriteLine(msg);
            Thread.Sleep(200);
        }

A super fast pub/sub system in RedisPublishSample:

        static void Main(string[] args)
        {            // Connect to the Redis cluster
            RedisCluster.register();
            Cluster.connect("redis", "redis-test", "localhost", "0");

            // Launch a process that publishes a random number as fast as possible
            var pid = spawn<Random, int>("redis-pubsub-random-test", Setup, Inbox, ProcessFlags.RemotePublish);

            // Listen to the published results coming back from the Redis channel
            subscribe<int>(pid, Console.WriteLine);

            // Start it off by sending the first message
            tell(pid, 0); 
        }

        //  Get the initial state of the process
        static Random Setup() => new Random();

        //  Inbox message handleer
        static Random Inbox(Random rnd, int value)
        {
            publish(value);
            tellSelf(rnd.Next());
            return rnd;
        }

Finally the RedisStateSample where it persists the process's state after each message.

        static void Main(string[] args)
        {
            // Connect to the Redis cluster
            RedisCluster.register();
            Cluster.connect("redis", "redis-test", "localhost", "0");

            // Spawn the process
            var pid = spawn<int, int>("redis-state-sample", Setup, Inbox, ProcessFlags.PersistState);

            // Subscribe locally to the state changes
            observeState<int>(pid).Subscribe(Console.WriteLine);

            // Start it off by sending the first message
            tell(pid, 1);
        }

        //  Get the initial state of the process
        static int Setup() => 0;

        //  Inbox message handleer
        static int Inbox(int state, int value)
        {
            state += value;
            tellSelf(value,TimeSpan.FromSeconds(1));
            return state;
        }

This is all checked into the master branch. It's not on NuGet yet, although I'll probably be doing an alpha release soon. There's lots of big updates from the past 6 months or so.

ricardopieper commented 9 years ago

So the process part of LanguageExt is something like Akka.net?

In fact I will need an actor system very soon. I will not do any complex work, just some simple things but I'll give it a try.

About the method signature MatchAsync, I like it because it looks Microsoft-y. Their APIs are all SomethingAsync, so the name fits well.

louthy commented 9 years ago

Yes, it's very much like Akka. Akka is much more mature than this, and very active, so you should definitely check that out too. I tried Akka about a year ago; initially I liked it, but became frustrated with it. It was pre-release, so I'm sure it's improved a lot since then. I was mainly using the F# wrapper for their API which was quite nice, but limited. When I looked at the C# API it left me a little cold to be honest. This is just a personal preference, so don't let that put you off checking it out.

My personal view is that the Actor model + functional message loops is the perfect programming model.

Purely functional programming without the actor model at some point needs to deal with the world, and therefore needs statefullness. So you end up with imperative semantics in your functional expressions (unless you use Haskell).

Now you could go the Haskell route, but I think there's something quite perfect about having a bag of state that you run expressions on as messages come in. Essentially it's a fold over a stream.

The things that I felt I was missing when I tried Akka was that it didn't seem to acknowledge anything outside of its system. Now I know that the Actor model is supposed to be a self contained thing, and that's where its power lies, but in the real world you often need to get information out of it, and declaring another class to receive a message was getting a little tedious. So what I've done is:

Remove the need to declare new classes for processes (actors) If your process is stateless then you just provide an Action<TMsg> to handle the messages, if your process is stateful then you provide a Func<TState> setup function, and a Func<TState,TMsg, TState> to handle the messages. This makes it much easier to create new processes and reduces the cognitive overload of having loads of classes for what should be small packets of computation.

You still need to create classes for messages and the like, that's unavoidable (Use F# to create a 'messages' project, it's much quicker and easier). But also, it's desirable, because it's the messages that define the interface and the interaction, not the processing class.

So as with my first example, creating something to log string messages to the console is as easy as:

    var log = spawn<string>("logger", Console.WriteLine);

    tell(log, "Hello, World");

Or if you want a stateful, thread-safe cache:

    public enum CacheMsgType
    {
        Add,
        Remove,
        Get,
        Flush
    }

    class CacheMsg
    {
        public CacheMsgType Type;
        public string Key;
        public Thing Value;
    }

    public ProcessId SpawnThingCache()
    {
        return spawn<Map<string, Thing>, CacheMsg>(
            "cache",
            () => Map<string, Thing>(),
            (state, msg) =>
                  msg.Type == CacheMsgType.Add    ? state.AddOrUpdate(msg.Key, msg.Value)
                : msg.Type == CacheMsgType.Remove ? state.Remove(msg.Key)
                : msg.Type == CacheMsgType.Get    ? state.Find(msg.Key).IfSome(reply).Return(state)
                : msg.Type == CacheMsgType.Flush  ? state.Filter(s => s.Expiry < DateTime.Now)
                : state
        );
    }

The ProcessId is just a wrapped string path, so you can serialise it and pass it around, then anything can find and communicate with your cache:

    // Add a new item to the cache
    tell(cache, new CacheMsg { Type = CacheMsgType.Add, Key = "test", Value = new Thing() });

    // Get an item from the cache
    var thing = ask<Thing>(cache, new CacheMsg { Type = CacheMsgType.Get, Key = "test" });

    // Remove an item from the cache
    tell(cache, new CacheMsg { Type = CacheMsgType.Remove, Key = "test", Value = new Thing() });

Then you periodically want to flush the cache contents. Just fire up another process, they're basically free (and by using functions rather than classes, very easy to put into little worker modules):

    public void SpawnCacheFlush(ProcessId cache)
    {
        // Spawns a process that tells the cache process to flush, and then sends
        // itself a message in 10 minutes which causes it to run again.
        var flush = spawn<Unit>(
            "cache-flush", _ =>
            {
                tell(cache, new CacheMsg { Type = CacheMsgType.Flush });
                tellSelf(unit, TimeSpan.FromMinutes(10));
            });

        // Start the process running
        tell(flush, unit); 
    }

So as you can see that's a pretty powerful technique. Remember the process could be running on another machine, and as long as the messages serialise you can talk to them by process ID.

What about a bit of load balancing? This creates 100 processes, and as the messages come in to the parent indexer process, it automatically allocates the messages to its 100 child processes in a round-robin fashion:

    var load = spawnRoundRobin<Thing>("indexer", 100, DoIndexing);

Added a publish system to the processes

So if as process needs to announce something it just calls:

    publish(msg);

Another process can subscribe to that by calling:

    subscribe(processId);

(it can do this in its setup phase, and the process system will auto-unsub when the process dies, and auto-resub when it restarts)

Then the messages that are published by one process can be consumed by any number of others (via their inbox in the normal way). I found I was jumping through hoops to do this with Akka. There are 'official' actor ways of doing this, but sometimes, as I say, you want to jump outside of that system.

For example, if your code is outside of the process system, it can get an IObservable stream instead:

var sub =  observe<Thing>(processId).Subscribe( msg => ...);

A good example of this is the 'Dead Letters' process, it gets all the messages that failed for one reason or another (serialisation problems, the process doesn't exist, the process crashed, etc.). All it does is call publish(msg). This is how it's defined:

    var deadLetters = spawn<object>("dead-letters",publish);

That's it! For a key piece of infrastructure. So it's then possible to easily listen and log issues, or hook it up to a process that persists the dead letter messages.

'Discoverability' I was struggling to reliably get messages from one machine to another, or to know the process ID of a remote actor so I could message it. This was before they did their cluster work I think, so I assume these problems are solved now. But what I want to do with this is to keep it super light, and lean. I want to keep the setup options simple, and the 'discoverability' easy.

So there's a supervision hierarchy like Akka, where you have a root node, then a child user node, and then you create your processes under the user node. There's a system node that handles stuff like dead-letters and various other housekeeping tasks.

    /root/user/...
    /root/system/dead-letters
    etc.

But when you create a Redis cluster connection the second argument is the name of the app/service/website, whatever it is that's running.

    RedisCluster.register();
    Cluster.connect("redis", "my-stuff", "localhost", "0");

Then your user hierarchy looks like this:

    /root/my-stuff/...

So you know where things are, and what they're called, and they're easily addressable. You can just 'tell' the address:

    tell("/root/my-stuff/hello", "Hello!");

Even that isn't great if you don't know what the name of the 'app' is running a process. So processes can register by a single name, that goes into a 'shared hierarchy':

    /root/registered/...

To register:

    register(myProcessId, "hello-world");

Then anyone else can tell by name:

    tell(find("hello-world"), "Hi!");

Style The final thing was just style really, I wanted something that complemented the Language-Ext style, was 'functional first' rather than as an afterthought. I think it's looking pretty good (files to look at are Prelude.cs, Prelude_Ask.cs, Prelude_Tell.cs, Prelude_PubSub.cs, Prelude_Spawn.cs). I like the static access to contextual things, like Process.Sender inside a process gets the ID of the process that sent you the message. Or calling Process.reply, automatically knows where to send it, even though you're in lambda.

One wish-list item is to create a IO monad that captures all of the IO functions like tell, ask, reply, and publish as a series of continuations so that I can create a single transaction from one message loop, and use that transaction to do hyper-robust message sequencing. Because currently delivery is asynchronous, so sometimes you're at the mercy of the thread-pool. It would also allow for high quality unit testing of the message-loops, because you could mock the IO functions.

Ok, got a bit carried away typing there! Think I'll get on with it...

ricardopieper commented 9 years ago

I wish I could honor all this text and make some deep/insightful comment, but unfortunately I can't. But it does look like good documentation :smile:

ricardopieper commented 9 years ago

I'm doing some tests using the Process library.

I'm using LanguageExt because I'm a little more familiar with functional programming and I want the functional goods in my code.

But the rest of my team (just only one other guy) probably isn't familiar with it. So I'm introducing Options and Eithers and functional programming in general in an easy to digest way.

While the Process library seems to work fine (and the Fold approach is very cool), I think i'll stick to the class-based Akka approach because it's more familiar to the rest of the team. Also, it's more mature, it certainly counts.

louthy commented 9 years ago

Yeah, that's fair reasoning. I am using it in production right now, but obviously I know it inside out, so I can deal with any issues. It's definitely too early for me to recommend anyone else use it for production, unless you were going to release in say 3 - 6 months when I know it will be nailed.

Funnily enough as I was typing the info last night, I realised I could offer both the functional route and the class based route for those that prefer it, by making the object into the state. It's on the master now. You simply derive from IProcess

    class Logger : IProcess<string>
    {
        public void OnMessage(string message)
        {
            Console.WriteLine(message);
        }
    }

Create it like so:

    var log = spawn<Logger,string>("logger");

    tell(log,"Hello, World");

It needs a public constructor to setup, and it will be disposed correctly if you derive it from IDisposable.

ricardopieper commented 9 years ago

So IProcess looks like TypedActor in Akka. Interesting.

I updated my fork, but changed the new span method to this:

 public static ProcessId spawn<P,T>(ProcessName name)
        where P : IProcess<T>, new() //added new() constraint
    {
        return spawn<IProcess<T>, T>(name, () => {
              var p = (IProcess<T>) new P(); //no reflection (I think)
              p.Setup(); //added Setup to the interface as well
              return p;
          },
          (process, msg) => {
              process.OnMessage(msg);
              return process;
          });
    }
ricardopieper commented 9 years ago

I also created this:

 public static ProcessId spawn<P, T>(ProcessName name, P processInstance)
        where P : IProcess<T>
    {
        return spawn<IProcess<T>, T>(name, () => processInstance,
          (process, msg) =>
          {
              process.OnMessage(msg);
              return process;
          });
    }

For reasons that I don't know yet.

Edit: Probably for starting state. I could do this in the Setup function, but I thought that it could be useful.

louthy commented 9 years ago

I removed the need for an OnSetup \ Setup method because, well, that's what the constructor is for :-)

You're right about the new() constraint, I was clearly having a brain freeze there!

On your last example, that's not good, because the lifetime of the process must be controlled by the actor system. When your process throws an exception the parent actor gets to decide what should happen. At the moment the default behaviour is to reset the process, and send the message that caused the error to the dead-letters process (there will be more 'strategies' available soon),

The reset must shutdown (and dispose) the process, and then set it up again with fresh state before giving it the next message. So passing in processInstance will break that and give unpredictable results. Essentially it leaks the state outside of the system.

The nice thing about this approach is when your process starts, the ctor is called, then it receives messages until it's shutdown, then Dispose is called. So it honours the .NET object life-cycle correctly.

ricardopieper commented 9 years ago

Hmm, I understand. I didn't consider that.

louthy commented 9 years ago

The actor model is definitely a different 'head space' to be in, but when it all sinks in, and you see the possibilities, it's really, really sweet.