dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.21k stars 1.57k forks source link

Allow sending synchronous generators via ports(support copying of SuspendState) #51757

Open aam opened 1 year ago

aam commented 1 year ago

Currently it's impossible to send generator-created stream via ports due to vms inability to serialize SuspendState, but it seems reasonable to expect for it to work, copy sender's state over effectively forking the stream. Receiver will pick up the stream where it was forked.

import 'dart:isolate';

Stream<int> evenNumbersGenerator() async* {
  int i = 2;
  while (true) {
    yield i;
    i += 2;
  }
}

main() async {
  final rp = ReceivePort();
  rp.sendPort.send(evenNumbersGenerator());
  rp.close();
}

fails with

Unhandled exception:
Invalid argument(s): Illegal argument in isolate message: (object is a SuspendState)
 <- Context num_variables: 1
 <- Closure: (Object?) => void (from dart:core)
 <- Instance of '_ControllerStream<int>' (from dart:async)

#0      _SendPort._sendInternal (dart:isolate-patch/isolate_patch.dart:249:43)
#1      _SendPort.send (dart:isolate-patch/isolate_patch.dart:230:5)
#2      main (file:///Users/aam/p/d1/sdk/runtime/tests/vm/dart/send_async_generator.dart:14:15)
#3      _delayEntrypointInvocation.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:297:19)
#4      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:189:12)

cc @alexmarkov @rmacnak-google

alexmarkov commented 1 year ago

Copying Stream objects when sending them between isolates would not work: listener in another isolate would not be notified when events are generated in the original isolate. It is basically the same reason why sending Future objects to another isolate by copying them doesn't make much sense (the reasoning in https://github.com/dart-lang/sdk/issues/51594#issuecomment-1465787021 applies to streams as well).

It is not hard to support sending/copying SuspendState objects within isolate group as we share code between isolates in the same group. We can just clone SuspendState object - see SuspendState::Clone. We can use copying of SuspendState objects to implement sending Iterable objects returned by sync* functions, although it may not be very useful for users as sync* functions are extremely rare.

aam commented 1 year ago

Sure, changed asyncronous to synchronous.

Repro of problem sending synchronous generators:

import 'dart:isolate';

Iterable<int> evenNumbersGenerator() sync* {
  int i = 2;
  while (true) {
    yield i;
    i += 2;
  }
}

main() async {
  final rp = ReceivePort();
  rp.sendPort.send(evenNumbersGenerator());
  rp.close();
}
Unhandled exception:
Invalid argument(s): Illegal argument in isolate message: (object is a SuspendState)
 <- Instance of '_SyncStarIterable<int>' (from dart:async)

#0      _SendPort._sendInternal (dart:isolate-patch/isolate_patch.dart:249:43)
#1      _SendPort.send (dart:isolate-patch/isolate_patch.dart:230:5)
#2      main (file:///Users/aam/p/d1/sdk/runtime/tests/vm/dart/send_async_generator.dart:13:15)
#3      _delayEntrypointInvocation.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:297:19)
#4      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:189:12)