dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.08k stars 1.56k forks source link

[Feature] Helper function for long-running isolates #50882

Open albertodev01 opened 1 year ago

albertodev01 commented 1 year ago

Background

Dart 2.19 introduces the Isolate.run function to concurrently execute computations. This is extremely helpful because it reduces the complexity of spawning a new isolate to execute a single function. I also agree that

await Isolate.run<int>(() => doSomething());

is shorter and easier to understand than

final port = ReceivePort();
await Isolate.spawn<SendPort>(
  (port) {
    final result = doSomething();
    Isolate.exit(sendPort, result);
  }
  port.sendPort,
);

Proposal

I would like to introduce a similar API for long-running isolates. At the moment (Dart 2.18.x), we can of course spawn two isolates and send messages between them but there is a lot to do. One has to deal with ports, streams and messages. For example, this what you'd generally do to have 2 isolates that exchange messages:

final receivePort = ReceivePort();

await Isolate.spawn<SendPort>((port) {
  final receivePort = ReceivePort(); 
  sendPort.send(receivePort.sendPort);

  // Listens for messages from the main isolate
  receivePort.listen(...);
}, receivePort.sendPort); 

// Make it broadcast so that we can listen on the port multiple times
final broadcast = receivePort.asBroadcastStream(); 

// 'SendPort' of the worker isolate, for two-way communication
final sendPort = await broadcast.first as SendPort; 

// Listen for messages from the worker isolate
broadcast.listen( ... );

It works, but that is a lot of code. The problem I see here is that we need to manually setup and exchange ports between isolates to allow a 2-way communication. What if there was a function similar to Isolate.run that already "prepares" the ports and returns the result for us? The above code would reduce to this:

 final receivePort  = ReceivePort();

 final worker = await Isolate.runWithPort((ReceivePort port) {
   // Listen for messages from the main isolate
   port.listen( ... );

   // Send messages to the main isolate
   port.sendPort.send('Hello from worker');
 }, receivePort);

 // Listen for messages from the worker
 receivePort.listen(...);

 // Send messages to the worker isolate
 worker.sendPort.send('Hello from main');

The ports are automatically configured and we just need to use them. No need to manually exchange ports and setup the communication channel. I would like to introduce a function in the Isolate class with the following signature (or something similar):

static Future<({Isolate isolate, SendPort sendPort})> runWithPort(void Function(ReceivePort) computation, ReceivePort port) { ... };

This function returns a reference to the worker isolate (so that we can call isolate.kill() for example) and a reference to the sendPort of the worker isolate (so we can send messages from the main to the worker isolate).


Considerations

Running concurrent functions is something that I often do in my daily Flutter job. For example, we have an analytics report system that concurrently runs on a worker isolate and we exchange message (main isolate <--> worker isolate). Another example I have is about a numerical analysis tool that solves large systems on a separated isolate and exchanges messages with the main one to show the result in the Flutter UI.

With the introduction of Isolate.run in Dart 2.19 you've made a great addition because that allows me to quickly run computations on a separated isolate with no setup. However, we've developed an internal package to simplify 2-way communication between isolates.

In my ideal future, I'd like to be able to do this:

  1. execute "one shot" concurrent operations using Isolate.run
  2. exchange messages between two isolates using Isolate.runWithPort
  3. for any other use-case, we can manually setup isolates and do whatever we want

I think that (1) and (2) are very useful. This proposal is not about a bug, it's about an usability issue I see. Since you've already added the Isolate.run function, I think you may be interested in also adding Isolate.runWithPort (or whatever name you wish).

lrhn commented 1 year ago

The generalization of Isolate.run would be IsolateRunner, or another similar modernized API. You start an isolate first, then you can run any function in it, just like Isolate.run (but without using Isolate.exit for return values), and finally you shut it down.

Another generalization is to start an isolate containing a state (a single variable initialized when the isolate is started), and then allow functions to be run against that state, in the isolate, either reading or updating it, and returning results. That's something I've also seen people working on.

Generalizing two-way communication is harder, because it is about two isolates, not just one. I'd usually use the same port for multiple purposes (because it's cheaper and more predictable than having multiple ports), and then build a higher level protocol on top. Exposing the ports directly is a low-level untyped primitive, so I'd probably rather have an API like static Future<({Sink<I> input, Stream<O> output, Isolate isolate})> spawnConnection<I, O>(Future<void> Function(Sink<O> output, Stream<I> input) computation), which sets up typed connections and allows you to shut down by making the computation function return.

albertodev01 commented 1 year ago

I was looking at IsolateRunner and it does exactly what I was thinking about, but that package is now discontinued. I think that

static Future<({Sink<I> input, Stream<O> output, Isolate isolate})> spawnConnection<I, O>(Future<void> Function(Sink<O> output, Stream<I> input) computation)

is also a great idea for this use case. Is your example assuming that streams will be "high level wrapper" of ports? I mean that the user sees a stream but you "hide" and manage the ports internally

lrhn commented 1 year ago

Yes, the streams/sinkes would be backed by ports, but would be more types safe and provide more flexibility in how to use them.

Another possible API would be:

Stream<O> remoteConnection<I, O>(Stream<I> input, Stream<O> Function(Stream<I>) remote);

where you you provide a local stream as input, and returns an output stream. Then it spawns a new isolate, runs remote with a stream connected to the input through ports, returns an output stream, which is then connected to the originally output stream through the same ports. Probably needs a way to shut down the remote as well.

But you can probably get the same effect by having a way to wrap any Stream in a wrapper that can be sent to other isolates, and which then connect back to the original when you try to listen. Something like:

import "dart:isolate";

/// Creates a stream which forwards events of [source].
/// 
/// The returned stream can be sent to other isolates.
///
/// Listening on the returned stream will listen on the
/// [source] stream, in the isolate where [remoateableStream] was called,
/// and will forward all events and pause/resume/cancel requests to between the
/// [source] subscription and the subscription returned
/// by the remotable stream.
///
/// Calling [close] on the remotable stream will 
/// prevent further listens from succeeding.
RemotableStream<T> remotableStream<T>(Stream<T> source) {
  var p = RawReceivePort();
  var connections = <int, StreamSubscription<Object?>>{};
  var counter = 0; // Give id numbers to each listen.
  var closed = false;
  p.handler = (m) {
    switch (m) {
      case "close":
        closed = true;
        if (connections.isEmpty) p.close();
      case ("l", SendPort port, bool cancelOnError):
        if (closed) { // No new connections.
          port.send((StateError("Closed"), StackTrace.empty));
          return;
        }
        var id = counter++;
        try { 
          connections[id] = source.listen(
             (v) => port.send((v,)), 
             onError: (e, s) => port.send((e, s)), 
             onDone: () {
               port.send(null); // done or successfully cancelled.
               connections.remove(id);
               if (closed && connections.isEmpty) p.close();
             }, 
             cancelOnError: cancelOnError);
        } catch (e, s) {
          // The `listen` call can fail, e.g., if it's a single-subscription stream.
          port.send((e, s));
        }
      case ("p", int id):
        connections[id]?.pause();
      case ("r", int id):
        connections[id]?.resume();
      case ("c", int id, SendPort port):
        var sub = connections.remove(id);
        if (sub ! null) {
          if (closed && connections.isEmpty) p.close();
          sub.cancel().then((_) {
              port.send(null);
            }, onError: (e, s) {
              // Special error message which always goes into cancel future.
              port.send((e, s, null));
            });
       }
    }
  };
  return RemotableStream<T>._(p.sendPort);
}

/// A stream which can be sent to other isolates.
///
/// Usually streams cannot be sent to other isolates.
/// This remotable stream is facade for a stream which exists
/// in its original isolate, and listening to the remotable
/// stream will communicate with that isolate and listen
/// on the original stream.
/// Then stream events and flow control requests are forwarded
/// between the isolates as needed.
///
/// Calling [close] will allow the original isolate to stop
/// listening for further `listen` requests.
class RemotableStream<T> extends Stream<T> {
  final SendPort _sourcePort;
  RemoteableStream._(this._sourcePort);

  @override
  StreamSubscription<T> listen(void onData(T), 
      {Function? onError, void Function()? onDone, bool cancelOnError = false}) {
    var local = StreamController<T>(sync: true);
    var subscription = local.stream.listen(
        onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
    RawReceivePort?  messagePort = RawReceivePort();
    var sendPort = messagePort.sendPort;
    int? id = null;
    Completer<void>? cancelFuture?;
    local..onPause = () {
        if (id case var i?) {
          sourcePort.send(("p", i));
        }
      }..onResume = () {
        if (id case var i?) {
          sourcePort.send(("r", i));
        }
      }..onCancel = () {
        cancelFuture = Completer.sync();
        if (id case var i?) {
          sourcePort.send(("c", i, sendPort));
        }
      };
    messagePort.handler = (message) {
      switch (message) {
        case null: // Stream closed or cancel successful.
          message.close();
          local.close();
          cancelFuture?.first?.complete(null);
        case int i:
          id = i;
          if (cancelFuture != null) {
            sourcePort.send(("c", i, sendPort));
          } else if {
            if (local.isPaused) sourcePort.send(("p", i));
          }
         case (T value,):
            local.add(value);
         case (Object e, StackTrace s):
            local.addError(e, s);
            if (cancelOnError) messagePort.close();
         case (Object e, StackTrace s, null):
            cancelFuture!.first?.completeError(e, s);
      }
    };
    sourcePort.send("l", sendPort, cancelOnError);
    return subscription;
  }

  /// Requests the source of this remote stream to stop respondning to new connections.
  void close() {
    _sourcePort.send("close");
  }
}

extension <T> on Completer<T> {
  /// A completer the first time it is completed.
  ///
  /// Evaluates to `null` if the completer has already been completed.
  /// Allows using `completer.first?.complete(...)` to only complete it once.
  Completer<T>? first => isCompleted ? null : this;
}
albertodev01 commented 1 year ago

This seems like a very interesting API.

I just would change the remotableStream to something else, line isolatedStream just to get the idea that it's running operations concurrently.

Are you (the team) interested in experimenting this kind of API?