leostera / reactor

🚀 Native Actors for Reason and OCaml
70 stars 5 forks source link

Async tasks #8

Closed baransu closed 5 years ago

baransu commented 5 years ago

Right now blocking task will block the whole worker. We have to think about handling async tasks in a non-blocking way.

Initial idea after a quick chat with @ostera was to have something like Become_async(promise) and run async tasks using e.g lwt.

leostera commented 5 years ago

Elaborating a little more for anyone who gets here.

As it is right now, actors being cooperative scheduled in a worker, you'd block a full worker by making a blocking call. For example:

let rec loop = (ctx, state) => loop(ctx, state);
spawn(loop, ());

Would effectively block a worker entirely.

A more common use-case would be to make an HTTP request, and use the return value for something. Here's an actor that on every reduction will send an HTTP request and block until it's received a response:

spawn(
  (ctx, state) => {
    let req =
      Httpkit.Client.Request.create(
        ~headers=[("User-Agent", "reactor")],
        `GET,
        Uri.of_string("http://api.github.com/repos/ostera/reactor"),
      );

    let response =
      Httpkit_lwt.Client.(req |> Http.send >>= Response.body |> Lwt_main.run);

    `Become(response); /* or do something else with it */
  },
  "",
);

Any alternative that relies on immediately returning control to the scheduler will require integrating a promise engine into the worker loop, or spawning dedicated workers for that specific actor.

I'd like not to go there, because the cost of spawning new unix processes is much much higher and would make spawning tons of those actors something to be worried about.

I think modelling a deferred reduction step would allow for actors to seamlessly be asynchronous or synchronous as they see fit. The reduction could look like Defer(Lwt.t), and that would then make the promise engine poll for promises during the next reduction.

Rewriting the HTTP request example, it could look like this:

spawn(
  (ctx, state) => {
    /* state will be an empty string first */
    /* and on the next reduction it will be body of the http response */

    let req =
      Httpkit.Client.Request.create(
        ~headers=[("User-Agent", "reactor")],
        `GET,
        Uri.of_string("http://api.github.com/repos/ostera/reactor"),
      );

    let response = Httpkit_lwt.Client.(req |> Http.send >>= Response.body);

    `Defer(response);
  },
  "",
);

That being said, this would open the question of how do you handle a promise that fails or throws. The answer is that you don't handle it. If the promise fails, this actor will be terminated. In other words, we should follow the Let It Crash philosophy.

In the future we can implement links and monitors to communicate to supervising actors that this actor has been terminated and have them retry.