dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.09k stars 1.56k forks source link

Feature request: SendPort send done event to ReceivePort #28731

Open alsemenov opened 7 years ago

alsemenov commented 7 years ago

According to description of class SendPort, there is no way to close the SendPort instance and signal to bound ReceivePort to generate done event.

Lets consider the following example:

import "dart:async";
import "dart:isolate";

void entryPoint(SendPort sendPort) {
  for (int i in [0,1,2,3,4,5,6,7,8,9]){
    sendPort.send(i);
  }
}

main() async {
  ReceivePort receivePort = new ReceivePort();
  Isolate isolate = await Isolate.spawn(
                                        entryPoint,
                                        receivePort.sendPort);
  receivePort.listen(
      (data) {
        print("received: $data");
      },
      onDone:() {
        print("done");
      }
  );
}

The program is quite simple and easy to understand. Unfortunately program successfully prints received for all digits, but it never prints done It is necessary to call receivePort.close() explicitly to trigger the event done and finish the program. But to prevent data loss, close() must be called only when all data is received. However detecting the moment, when all data is sent and received might be tricky and error prone. Here are some options:

  1. Send some special value at the end of the data.
  2. Use isolate onExitListener
  3. Use another pair of SendPort-ReceivePort to send the special value to indicate end of the data

I think that all these options have problems:

  1. It is necessary to modify the essential functionality of the program (aka business logic). In the example we send integers and either have to make some integer value special (i.e. do not use it for business) or change type integer to Object and use some special object like Capability. In all cases we have to modify the sender and receiver and call receivePort.close() explicitly. The program will became less simple and easy to understand.

  2. There is a race condition between isolate terminated event and the business data. Since they are sent via different channels (i.e. SendPort-ReceivePort pairs) the isolate terminated may arrive earlier than last digit, for example, and if we call receivePort.close() then last digit will be lost.

  3. There is a race condition between end of data value and business data. Some data may be lost similar way as in previous point.

The elegant solution would be to introduce method close() in SendPort. The method should signal to bound ReceivePort to generate done event. Therefore no other tricky solutions would be necessary.

lrhn commented 7 years ago

The reason we can't close a ReceivePort automatically is that we don't know when all its SendPort instances are gone. You can have multiple SendPort instances spread over multiple isolates that can all send to the same ReceivePort. If any one of them can close the ReceivePort, then it can break the other code that expects the port to keep working.

If you want a ReceivePort that can be cancelled remotely, it's an abstraction on top of the plain receive port. Have one special message that you can send, and which is recognized by the receive port and makes it close, perhaps the SendPort itself (if you have the SendPort, you can cancel the ReceivePort, otherwise you can't), or a separate Capability that you can send on the port.

The basic ReceivePort/SendPort connection doesn't have this functionality, and that's deliberate. Closing is a protocol issue, not a connection issue - not all connections can be closed by the sender.

(Personally, I never use ReceivePort, I always use RawReceivePort because it better matches the way I think about ports).

alsemenov commented 7 years ago

Let's see how the initial code should be changed to use special value for end of data:

import "dart:async";
import "dart:isolate";

void entryPoint(List args) {
  SendPort sendPort = args[0];
  Capability endOfData = args[1];
  for (int i in [0,1,2,3,4,5,6,7,8,9]){
    sendPort.send(i);
  }
  sendPort.send(endOfData);
}

main() async {
  Capability endOfData = new Capability();
  ReceivePort receivePort = new ReceivePort();
  Isolate isolate = await Isolate.spawn(
                                        entryPoint,
                                        [receivePort.sendPort, endOfData]);
  receivePort.listen(
      (data) {
        if (data==endOfData){
          receivePort.close();
        } else {
          print("received: $data");
        }
      },
      onDone:() {
        print("done");
      }
  );
}

As I already mentioned there are several drawbacks of such solution:

  1. The type of end of data is different from data type (i.e. integer). It is not always possible to designate some value of the data type as special.
  2. Additional code is required to handle end of data, The data handler now handles two types of data: business (i.e. integer) and end of data
  3. The onDone is actually redundant, because end of data is actually handled in data handler.

I think, that the need to send the end of data signal is very common. The intention of standard library is (among others) to provide right solutions for common problems. So right solution for "send the end of data signal" problem in the standard library would be very helpful.

lrhn commented 7 years ago

Reopening as a request for a receive-port class that has a closable send-port.

It's not something we will do with the plain ReceivePort or RawReceivePort because giving every SendPort the ability to close the ReceivePort is too much power.

The type shouldn't be a problem. ReceivePorts are untyped (Stream<Object>), so there is no type constraint on what objects can be sent through them.

It could be something added in package:isolate. The only disadvantage is that it won't be possible to send the ClosableSendPort as-is to a foreign isolate. It has to be converted to a [SendPort,Capability] pair and rebuilt at the other end. Not hard, but still an extra complexity. If we can send ClosableSendPort as easily as any other SendPort, then that gets easier.

As a simple approach, you can use a wrapper like:

import "dart:isolate";
import "dart:async";

/// A [ReceivePort] that can be closed from the other end.
///
/// The [sendPort] getter provides a [ClosableSendPort] which can be used to close the
/// receive port by sending a specific message.
/// That message is the underlying [SendPort] itself (also available as [rawSendPort]).
/// All other messages are sent as normal.
class ClosableReceivePort extends Stream implements ReceivePort {
  RawReceivePort _port;
  StreamController _controller = new StreamController(sync: true);

  ClosableReceivePort() : this.fromRawReceivePort(new RawReceivePort());
  ClosableReceivePort.fromRawReceivePort(RawReceivePort port) : _port = port {
    _port.handler = _handle;
  }

  StreamSubscription listen(void onData(var message), {
                                            Function onError, void onDone(), bool cancelO$
    return _controller.stream.listen(onData, onDone: onDone);
  }

  void _handle(Object message) {
    if (_port.sendPort != message) {
      _controller.add(message);
    } else {
      _port.close();
      _controller.close();
    }
  }

  void close() {
    _port.close();
    if (!_controller.isClosed) scheduleMicrotask(_controller.close);
  }

  ClosableSendPort get sendPort => new _ClosableSendPort(_port.sendPort);

  /// The underlying [SendPort] for the closable receive-port.
  ///
  /// Sending this port through itself will close this receive port.
  /// All other messages are delivered through this [ReceivePort].
  SendPort get rawSendPort => _port.sendPort;
}

/// The send-port of a [ClosableReceivePort].
///
/// This class implements [SendPort], but can't be sent to an isolate that wasn't created using
/// [Isolate.spawn] from the same original isolate as this one. 
abstract class ClosableSendPort implements SendPort {
  /// Create a [ClosableSendPort] that sends messages through [rawPort].
  ///
  /// The resulting [ClosableSendPort] sends [rawPort] as the close message on itself when
  /// [close] is called.
  ClosableSendPort(SendPort rawPort) = _ClosableSendPort;

  /// Whether [close] has been called.
  bool get isClosed;

  /// Request that the receive port closes.
  ///
  /// The receive port won't necessarily close immediately, but further send operations on this
  /// [SendPort] are ignored.
  void close();
}

class _ClosableSendPort implements ClosableSendPort {
  // Set to `null` when closed.
  SendPort _port;
  _ClosableSendPort(this._port);
  void send(message) { _port?.send(message); }  // Or throw when closed, either can work.
  bool get isClosed => _port == null;
  void close() { _port?.send(_port); _port = null; }
  int get hashCode => _port.hashCode ^ 55555555;
  bool operator==(Object other) => other is _ClosableSendPort && _port == other._port;
}
alsemenov commented 7 years ago

After further investigation, I realized, that almost any method in ReceivePort rely on onDone event. Here is simple example:

import "dart:async";
import "dart:isolate";

void entryPoint(SendPort sendPort) {
  for (int i in [0,1,2,3,4,5,6,7,8,9]){
    sendPort.send(i);
  }
}

main() async {
  ReceivePort receivePort = new ReceivePort();
  Isolate isolate = await Isolate.spawn(
                                        entryPoint,
                                        receivePort.sendPort);
  List data = await receivePort.toList();
  print(data);
}

This program just hangs and never stops. Future instance returned by toList() is never completed, because onDone is never generated.

In this case there is no way to modify receive handler to handle special end of data value. The receive port should be closed explicitly somehow. This problem applies to almost all ReceivePort methods:

I think, this make ReceivePort useless. Should I report this as a separate issue?

lrhn commented 7 years ago

I don't think reporting it will change anything, sadly.

We chose to make ReceivePort look like a Stream. That might have been a mistake, but it's too late to change now. We also decided that a SendPort can't unilaterally close a ReceivePort. That's just too unsafe. It's the same reason why a Future can't be cancelled - it's a shared resource and one user of the resource isn't allowed to control it.

Together that gives us the problems you describe, but not really any way to solve them. A ReceivePort isn't a finite stream - until it is manually closed, you can always create a new SendPort and send new events to it. All the operations that assume an end, will fail. If anything, we should document that on the ReceivePort.

The forEach operation isn't useless. I'd prefer that over listen for acting on all messages. The any, contains and every methods might be useful because they short-circuit when they have a result (and close the stream). The rest (and last/single as well) are definitely unusable because they won't give any result until the stream closes.

@floitschG WDYT?

alsemenov commented 7 years ago

I just selected all methods, that might hang for ever. forEach is not useless, but it returns Future that is completed, when all events are processed, which means never. The same issue with any, contains, every. They must be used with great care to avoid waiting forever. So these peculiarities should be at least documented. And because most of the ReceivePort methods can not be used or should be used with care, I have written that ReceivePort is useless.

alsemenov commented 7 years ago

BTW, even safe methods, that return Stream, like skip(), may cause problems, because they still return stream without end. For example,

Stream s = receivePort.skip(10); // ok, no wait forever, but
int sum = await receivePort.skip(10).reduce((x,y)=>x+y); // not ok, will hang forever