d-markey / squadron

Multithreading and worker thread pool for Dart / Flutter, to offload CPU-bound and heavy I/O tasks to Isolate or Web Worker threads.
https://pub.dev/packages/squadron
MIT License
79 stars 0 forks source link

Unable to pass serialized Channel between web workers #13

Closed jcoopeky closed 1 year ago

jcoopeky commented 1 year ago

I have been messing around with Squadron, and have to say I am very impressed, but I have run into an issue when attempting to pass Channels from one web worker to another.

Example:

///main.dart
import 'package:squadron/squadron.dart';

import 'js_test_worker.dart';

void main() async {
  Squadron.setId('MAIN');
  Squadron.debugMode = true;
  Squadron.setLogger(ConsoleSquadronLogger());

  final testWorker1 = createJsSampleWorker();
  await testWorker1.start();

  final testWorker2 = createJsSampleWorker();
  await testWorker2.setChannel(testWorker1.channel?.share().serialize());
}
///test_worker.dart
import 'package:squadron/squadron.dart';

import 'test_service.dart';

class TestWorker extends Worker implements TestService {
  TestWorker(dynamic entryPoint, {List args = const []})
      : super(entryPoint, args: args);

  @override
  Future<String> test() => send(TestService.testCommand);

  @override
  Future<void> setChannel(Object channelInfo) =>
      send(TestService.setChannelCommand, args: [channelInfo]);
}
///test_service.dart
import 'dart:async';

import 'package:squadron/squadron.dart';

abstract class TestService {
  FutureOr<String> test();
  FutureOr setChannel(Object channelInfo);

  // command IDs
  static const testCommand = 1;
  static const setChannelCommand = 2;
}

class TestServiceImpl implements TestService, WorkerService {
  int value = 0;
  Channel? testChannel;

  @override
  Future<String> test() async {
    return 'this is a test ${value++}';
  }

  @override
  void setChannel(Object channelInfo) async {
    testChannel = Channel.deserialize(channelInfo);
    var result = await testChannel!.sendRequest(TestService.testCommand, []);
    print(result);
  }

  // command IDs --> command handlers
  @override
  late final Map<int, CommandHandler> operations = {
    TestService.testCommand: (WorkerRequest r) => test(),
    TestService.setChannelCommand: (WorkerRequest r) => setChannel(r.args[0]),
  };
}
///js_test_worker.dart
import 'package:squadron/squadron_service.dart';

import 'test_worker.dart';
import 'test_service.dart';

// // dart compile js lib/js_test_worker.dart -o web/js_test_worker.dart.js -m
TestWorker createJsSampleWorker() => TestWorker('js_test_worker.dart.js');

// Web Worker entry-point.
// It must be a parameter-less "main()" function.
void main() => run((startRequest) => TestServiceImpl());
[2022-07-15T16:32:19.438Z] [SEVERE] [MAIN] failed to post request {a: [object MessagePort], b: 2, c: [[object MessagePort]], h: false}: error DataCloneError: Failed to execute 'postMessage' on 'MessagePort': A MessagePort could not be cloned because it was not transferred.
Error: ["$W","DataCloneError: Failed to execute 'postMessage' on 'MessagePort': A MessagePort could not be cloned because it was not transferred.","C:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/html/dart2js/html_dart2js.dart 21737:3                  [_postMessage_1]\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/html/dart2js/html_dart2js.dart 21731:7                  postMessage]\npackages/squadron/src/browser/_channel.dart 41:7                                                                  [_postRequest]\npackages/squadron/src/xplat/_value_wrapper.dart 61:16                                                             compute\npackages/squadron/src/browser/_channel.dart 96:19                                                                 sendRequest\npackages/squadron/src/worker.dart 119:30                                                                          send\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart 45:50   <fn>\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/zone.dart 1685:54                                 runUnary\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/future_impl.dart 147:18                           handleValue\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/future_impl.dart 766:44                           handleValueCallback\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/future_impl.dart 795:13                           _propagateToListeners\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/future_impl.dart 566:5                            [_completeWithValue]\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/future_impl.dart 639:7                            callback\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/schedule_microtask.dart 40:11                     _microtaskLoop\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/async/schedule_microtask.dart 49:5                      _startMicrotaskLoop\nC:/b/s/w/ir/cache/builder/src/out/host_debug/dart-sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart 166:15  <fn>\n","MAIN.2",2]
    at Object.throw_ [as throw] (http://localhost:17986/dart_sdk.js:5080:11)
at test_worker.TestWorker.new.send (http://localhost:17986/packages/squadron/src/squadron_logger.dart.lib.js:1387:19)
    at send.next (<anonymous>)
    at http://localhost:17986/dart_sdk.js:40641:33
    at _RootZone.runUnary (http://localhost:17986/dart_sdk.js:40511:59)
    at _FutureListener.thenAwait.handleValue (http://localhost:17986/dart_sdk.js:35438:29)
    at handleValueCallback (http://localhost:17986/dart_sdk.js:35999:49)
    at _Future._propagateToListeners (http://localhost:17986/dart_sdk.js:36037:17)
    at [_completeWithValue] (http://localhost:17986/dart_sdk.js:35872:23)
    at async._AsyncCallbackEntry.new.callback (http://localhost:17986/dart_sdk.js:35906:35)
    at Object._microtaskLoop (http://localhost:17986/dart_sdk.js:40778:13)
    at _startMicrotaskLoop (http://localhost:17986/dart_sdk.js:40784:13)
    at http://localhost:17986/dart_sdk.js:36261:9

Not sure if this is an actual issue or just a user error on my part, but in either case, it would be nice to have some additional examples of how to send Channels between web workers, and how to use those Channels to pass data between workers.

d-markey commented 1 year ago

Hello,

On Web platforms, Channels serialize as MessagePorts, which require cloning to cross Worker boundaries. Prior to 4.x, Squadron would inspect messages looking for non "base" types (strings, numbers, arrays...) and try to clone them, but this procedure adds some overhead especially when messages rely on large arrays for instance. So I decided to disable this mechanism by default.

It is possible to restore it with inspectRequest / inspectResponse but here you should only need inspectRequest.

Can you try this in your Worker:

Future<void> setChannel(Object channelInfo) =>
      send(TestService.setChannelCommand, args: [channelInfo], inspectRequest: true);
jcoopeky commented 1 year ago

That did it, everything works now. Thank you!

d-markey commented 1 year ago

No problem 😀 closing this issue then!