d-markey / squadron

Multithreading and worker thread pool for Dart / Flutter, to offload CPU-bound and heavy I/O tasks to Isolate or Web Worker threads.
https://pub.dev/packages/squadron
MIT License
79 stars 0 forks source link

WorkerException: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?' #3

Closed SaadArdati closed 2 years ago

SaadArdati commented 2 years ago

Trying my first run converting a complex system to use isolates/workers, i get the exception in the title.

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'
flutter: #0      openChannel (package:squadron/src/native/channel.dart:103:11)
<asynchronous suspension>
d-markey/squadron_sample#1      Worker.send (package:squadron/src/worker.dart:107:9)
<asynchronous suspension>
d-markey/squadron#3      WorkerTask._runFuture (package:squadron/src/worker_task.dart:148:21)
<asynchronous suspension>

I replicated the thumbnail example as far as I can tell.

Here's my code:

solver_service.dart

import 'dart:async';

import 'package:squadron/squadron.dart';

import '../position_manager_thread.dart';

abstract class SolverService {
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson);

  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  );

  static const cmdPerformLayout = 1;
  static const cmdInitRoot = 2;
}

class SolverServiceImpl implements SolverService, WorkerService {
  final PositionManagerThread positionManager =
      PositionManagerThread(notifier: (pos) => print(pos));

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      positionManager.initRoot(rootNodeJson).toJson();

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) async {
    return positionManager
        .performLayout(
          nodesReference,
          createdNodes,
          removedNodes,
          adoptedNodes,
          droppedNodes,
          changedNodes,
        )
        .toJson();
  }

  @override
  late final Map<int, CommandHandler> operations = {
    SolverService.cmdPerformLayout: (WorkerRequest r) => performLayout(
          r.args[0],
          r.args[1],
          r.args[2],
          r.args[3],
          r.args[4],
          r.args[5],
        ),
    SolverService.cmdInitRoot: (WorkerRequest r) => initRoot(
          r.args[0],
        ),
  };
}

solver_worker_activator.dart:

// stub file, the file used at runtime will depend on the target platform
//   * cf "browser\solver_worker_activator.dart" for Web workers (browser platform)
//   * cf "vm\solver_worker_activator.dart" for Isolate workers (vm platform)

// of course, if your application does not target both Web and VM platforms,
// you need not define a stub file and can go directly for your target platform

import 'solver_worker_pool.dart' show SolverWorker;

SolverWorker createWorker() =>
    throw UnsupportedError('Not supported on this platform');

solver_worker_pool:

// this is a helper file to expose Squadron workers and worker pools as a SolverService

import 'dart:async';

import 'package:codelessly_flutter/managers/position_manager/multithreading/solver_service.dart';
import 'package:squadron/squadron.dart';

// this is where the stub file comes into action
//
// of course, if your application does not target both Web and VM platforms,
// you need not define a stub file and can go directly for your target platform
import 'solver_worker_activator.dart'
    if (dart.library.js) 'package:codelessly_flutter/managers/position_manager/multithreading/browser/solver_worker_activator.dart'
    if (dart.library.html) 'package:codelessly_flutter/managers/position_manager/multithreading/browser/solver_worker_activator.dart'
    if (dart.library.io) 'package:codelessly_flutter/managers/position_manager/multithreading/vm/solver_worker_activator.dart';

// Implementation of SolverService as a Squadron worker pool
class SolverWorkerPool extends WorkerPool<SolverWorker>
    implements SolverService {
  SolverWorkerPool(ConcurrencySettings concurrencySettings)
      : super(createWorker, concurrencySettings: concurrencySettings);

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      execute((w) async => w.initRoot(rootNodeJson));

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) =>
      execute((w) async => w.performLayout(
            nodes,
            createdNodes,
            removedNodes,
            adoptedNodes,
            droppedNodes,
            changedNodes,
          ));
}

// Implementation of SolverService as a Squadron worker
class SolverWorker extends Worker implements SolverService {
  SolverWorker(dynamic entryPoint, {String? id, List args = const []})
      : super(entryPoint, id: id, args: args);

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) {
    return send(
      SolverService.cmdInitRoot,
      [
        rootNodeJson,
      ],
    );
  }

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) {
    return send(
      SolverService.cmdPerformLayout,
      [
        nodes,
        createdNodes,
        removedNodes,
        adoptedNodes,
        droppedNodes,
        changedNodes,
      ],
    );
  }
}

solver_vm.dart:

// Isolate implementation (VM patform)

import 'package:squadron/squadron_service.dart';

import '../solver_service.dart';

void start(Map command) => run((startRequest) => SolverServiceImpl(), command);

solver_worker_activator.dart:

// Creation of the Isolate

import '../solver_worker_pool.dart' show SolverWorker;
import 'solver_worker.dart' as isolate;

SolverWorker createWorker() => SolverWorker(isolate.start);

Usage is quite simple:


  // Crashes if maxParallel is set to 1 for some reason. I only want one instance of this worker/isolate at all times through the life cycle of the app
  final SolverWorkerPool pool = SolverWorkerPool(
      const ConcurrencySettings(minWorkers: 1, maxWorkers: 1, maxParallel: 2));

  Future<void> initRoot(RootNode node) async {
    final LayoutResult result =
        LayoutResult.fromJson(await pool.initRoot(node.toJson()));

I'm not really sure what I'm doing wrong. There's no mention or example that brings up SendPort/ReceiverPort for Squadron. I get this error for the performLayout() function as well.

I'm running this in a MacOS window. So the isolate only. No javascript

d-markey commented 2 years ago

Hello @SwissCheese5,

sorry I missed your post tonight. I'll have a look tomorrow and will keep you posted.

SaadArdati commented 2 years ago

Thank you :)

d-markey commented 2 years ago

Hello,

I've replicated your code to try and reproduce but found no problem.

I've had to adapt some parts of the code in particular I implemented a dummy PositionManagerThread, so maybe the problem is in there? Also, I had to update solver_worker_activator.dart because the import 'solver_worker.dart' as isolate; looks wrong to me. I replaced it with import 'solver_vm.dart' as isolate;.

FYI here's the sample squadron_test.zip

You can just dart run it and you should see some output like:

pool with 1 worker, parallelism = 2
[0:00:00.000861]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 922557508}
[0:00:00.001891]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 922557508}
[0:00:00.002294]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 922557508}
[0:00:00.002512]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 922557508}
[0:00:00.002723]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 922557508}
[0:00:00.002931]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 922557508}
[0:00:00.003140]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 922557508}
[0:00:00.003346]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 922557508}
[0:00:00.003553]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 922557508}
[0:00:00.003710]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 922557508}
SolverWorker 421263971: IDLE, load = 0 (max = 2, total = 10, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000

pool with 1-3 workers, parallelism = 2
[0:00:00.001073]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 749171621}
[0:00:00.001693]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 749171621}
[0:00:00.002258]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 749171621}
[0:00:00.003514]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 1046298529}
[0:00:00.004199]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 1046298529}
[0:00:00.004845]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 749171621}
[0:00:00.005250]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 541122101}
[0:00:00.005929]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 541122101}
[0:00:00.006295]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 749171621}
[0:00:00.006792]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 1046298529}
SolverWorker 27849473: IDLE, load = 0 (max = 2, total = 3, errors = 0), uptime = 0:00:00.015626, idle = 0:00:00.000000
SolverWorker 659981525: IDLE, load = 0 (max = 2, total = 5, errors = 0), uptime = 0:00:00.015626, idle = 0:00:00.000000
SolverWorker 940802164: IDLE, load = 0 (max = 2, total = 2, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000

single worker
[0:00:00.000679]  [0] {initRoot: ok, payload: {task-id: 0}, processed-by: 872248303}
[0:00:00.001150]  [1] {initRoot: ok, payload: {task-id: 1}, processed-by: 872248303}
[0:00:00.001485]  [2] {initRoot: ok, payload: {task-id: 2}, processed-by: 872248303}
[0:00:00.001919]  [3] {initRoot: ok, payload: {task-id: 3}, processed-by: 872248303}
[0:00:00.002252]  [4] {initRoot: ok, payload: {task-id: 4}, processed-by: 872248303}
[0:00:00.002589]  [5] {initRoot: ok, payload: {task-id: 5}, processed-by: 872248303}
[0:00:00.003082]  [6] {initRoot: ok, payload: {task-id: 6}, processed-by: 872248303}
[0:00:00.003414]  [7] {initRoot: ok, payload: {task-id: 7}, processed-by: 872248303}
[0:00:00.003695]  [8] {initRoot: ok, payload: {task-id: 8}, processed-by: 872248303}
[0:00:00.004135]  [9] {initRoot: ok, payload: {task-id: 9}, processed-by: 872248303}
SolverWorker 592595040: IDLE, load = 0 (max = 10, total = 10, errors = 0), uptime = 0:00:00.000000, idle = 0:00:00.000000
SaadArdati commented 2 years ago

Thank you, i'll look into this in the morning.

d-markey commented 2 years ago

BTW if you're going to have only one thread, you'd be better off using a standalone SolverWorker. In this case you should not need the pool, so don't bother implementing it.

SaadArdati commented 2 years ago

Ah, just create a simple and fresh instance of SolverWorkerImpl()? If not, can you please provide a simple example

d-markey commented 2 years ago

It's part of the sample, you will see the last use case uses a worker directly without a pool: final worker = SolverWorker(isolate.start);

If you use an instance of SolverWorkerImpl directly, it will run in your main Isolate but you want it running in a different thread. So you have to wrap it into a Squadron worker.

SaadArdati commented 2 years ago

Makes sense. I'll try this out soon. By the way I couldn't get the dart js command to work. Are you sure it's still working? If you are, I'll post logs tomorrow.

d-markey commented 2 years ago

Here's a trimmed down implementation if you're positive you don't need JavaScript and don't need to manage a worker pool.

I'll look into the problem regarding maxParallel = 1, thanks for reporting this one.

squadron_test_no_js.zip

SaadArdati commented 2 years ago

That's the issue, I definitely NEED the javascript. That's why I'm trying to use this package :P

SaadArdati commented 2 years ago

The need to use this package came from building an app with a very heavy computational process that must run on both desktop and web. So this package seemed perfect as it utilizes isolates on the web. But the dart js command seemed to fail every time. Will post logs later.

d-markey commented 2 years ago

OK, you wrote "Isolate only, no JavaScript" so I thought you wouldn't need it :-P

One important thing regarding Isolates and Web Workers is that data is copied (serialized/deserialized) when it passes from the main thread to the worker thread and back. And usually, the object types will not survive serialization. In particular, a Map<T, K> is likely to be received as a Map<dynamic, dynamic>, and Set<T> may not be supported. Note also that this behavior depends on the platform. Your issue may be caused by that kind of limitation, I've never tested on a MacOS.

For more background details:

You could try to modify the service worker like this to convert Set to List:

class SolverWorker extends Worker implements SolverService {
  SolverWorker(dynamic entryPoint, {String? id, List args = const []})
      : super(entryPoint, id: id, args: args);

  @override
  Future<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) {
    return send(
      SolverService.cmdInitRoot,
      [
        rootNodeJson,
      ],
    );
  }

  @override
  Future<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodes,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) {
    return send(
      SolverService.cmdPerformLayout,
      [
        nodes.toList(),
        createdNodes.toList(),
        removedNodes.toList(),
        adoptedNodes.toList(),
        droppedNodes.toList(),
        changedNodes.toList(),
      ],
    );
  }
}

and you will have to do the opposite on the receiving end:

class SolverServiceImpl implements SolverService, WorkerService {
  final PositionManagerThread positionManager =
      PositionManagerThread(notifier: (pos) => print(pos));

  @override
  FutureOr<Map<String, dynamic>> initRoot(Map<String, dynamic> rootNodeJson) =>
      positionManager.initRoot(rootNodeJson);

  @override
  FutureOr<Map<String, dynamic>> performLayout(
    Set<Map<String, dynamic>> nodesReference,
    Set<String> createdNodes,
    Set<String> removedNodes,
    Set<String> adoptedNodes,
    Set<String> droppedNodes,
    Set<String> changedNodes,
  ) async {
    return positionManager.performLayout(
      nodesReference,
      createdNodes,
      removedNodes,
      adoptedNodes,
      droppedNodes,
      changedNodes,
    );
  }

  @override
  late final Map<int, CommandHandler> operations = {
    SolverService.cmdPerformLayout: (WorkerRequest r) => performLayout(
          rebuildSet<Map<String, dynamic>>(r.args[0]),
          rebuildSet<String>(r.args[1]),
          rebuildSet<String>(r.args[2]),
          rebuildSet<String>(r.args[3]),
          rebuildSet<String>(r.args[4]),
          rebuildSet<String>(r.args[5]),
        ),
    SolverService.cmdInitRoot: (WorkerRequest r) => initRoot(
          rebuildMap(r.args[0]),
        ),
  };

  static Set<T> rebuildSet<T>(List items) => items.cast<T>().toSet();
  static Map<String, dynamic> rebuildMap(Map dict) => dict.map((key, value) => MapEntry<String, dynamic>(key, value));
  static Set<Map<String, dynamic>> rebuildMapSet(List<Map> items) => items.map((item) => rebuildMap(item)).cast<Map<String, dynamic>>().toSet();
}

Also check that import 'solver_worker.dart' as isolate; I mentionned, because from the file names you provided, it should really be solver_vm.dart. This is the file implementing the "start()" function for native Isolates, and I don't see any solver_worker.dart file in your original post.

I'll try to build a Flutter sample but it may not come before the week-end!

SaadArdati commented 2 years ago

Allow me to provide more context:

I followed the thumbnail example, so import 'solver_worker.dart' as isolate; is just following the pattern.

I have two versions of solver_worker and solver_worker_activator for every platform.

image

The activator should activate on that specific platform thanks to this conditional import: image

Since I need web to work with this as well, I can't really just use solver_vm.

You're right about not needing a pool, but I still need to conditionally import the activator. So I think the proper solution to not use the pools is to do this:

image

instead of:

final worker = SolverWorker(isolate.start);

Finally, I narrowed down the issue more, the one that was throwing:

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'

I realized I'd forgotten to even run worker.start(), so I did and bam, I got the error immediately:

flutter: type '_InternalLinkedHashMap<String, dynamic>' is not a subtype of type 'SendPort?'
flutter: #0      openChannel (package:squadron/src/native/channel.dart:103:11)
<asynchronous suspension>
d-markey/squadron_sample#1      PositionManager.initRoot (package:codelessly_flutter/managers/position_manager/position_manager.dart:32:5)
<asynchronous suspension>

with just:

  Future<void> initRoot(RootNode node) async {
    await worker.start();

    final LayoutResult result =
        LayoutResult.fromJson(await worker.initRoot(node.toJson()));
  }

I moved all the code to a new project and it worked just fine. I'm assuming this means that it's crashing because of a conflict with Flutter somehow.

Note that Flutter isolates are instantiated slightly differently than dart isolates. Could that be the cause?

d-markey commented 2 years ago

Calling start() is optional, it will be done when the first task is scheduled. Calling it upfront eg when your app is initializing will avoid the latency of doing it when the first task is queued.

Back to your problem, thanks for the screenshot. You're doing things right AFAICT and will need more time to investigate. If I can make a recommendation though, I would avoid being too strict on types for the service API. Eg avoid Set or Map <String, ...> in favor of the more generic List and Map.

SaadArdati commented 2 years ago

@d-markey Noted! :)

Eagerly waiting for your investigation results! 🤞

d-markey commented 2 years ago

I believe I have good news, but I still need to wrap things up before releasing a fix. The bottom line is an exception is thrown during the worker initialization. The connection "protocol" in place between the main app and the workers does not play well when that happens, I'm fixing it. That was an interesting one!

The bad news is the exception might come from your code, in which case your app will still crash. The fix will ensure Squadron provides better information to understand the situation, but you will have to figure it out.

Could you try this in solver_worker.dart to find out if it throws from your code:

// Isolate implementation (VM patform)

import 'package:squadron/squadron_service.dart';

import '../solver_service.dart';

void start(Map command) => run((startRequest) {
   try {
      return SolverServiceImpl();
   } catch (ex, st) {
      print(ex);
      print(st);
      rethrow;
   }
}, command);

Remember that Isolates in Flutter cannot use some advanced features, eg. sending platform-channel messages (see https://docs.flutter.dev/development/platform-integration/platform-channels#channels-and-platform-threading)

SaadArdati commented 2 years ago

The cool thing about my use case is that i don't want complex data, only lists of strings and nothing more. i highly doubt my data is the problem.

I will try your code soon!!

SaadArdati commented 2 years ago

I'm getting normal and absolutely valid errors now concerning my normal logic!! I'll keep fixing them and then come back :) It's actually throwing normal errors now.

d-markey commented 2 years ago

I was not concerned about the data, more about the execution context. The worker isolates are like "fresh" processes and if your service relies on other classes that need initialization, you have to set everything up inside the worker isolate before any message is sent.

SaadArdati commented 2 years ago

Right now, I'm facing an issue where the dart js command will throw an exception on every single object that relies on a flutter import

https://user-images.githubusercontent.com/7407478/152639317-a7e12d6d-c073-4ab2-8a70-a1560f116224.mp4

I'm assuming this is because I'm using flutter imports, and by substitution, dart:ui imports in the isolate logic. I'm accessing classes like Size/Offset/Axis/Rect, nothing more, which are unfortunately inside dart:ui

It's weird because the isolate runs just fine with them on vm, but compiling it to js pretty much fails. What's weirder is that the dart js command is throwing flutter errors across the entire project, as if its ignoring my specific files. I'm not sure why.

In any case, do you think purging all references from dart:ui will have the dart js command compile correctly? It's the last barrier I'm faicng.

d-markey commented 2 years ago

Web Workers in general (not just Dart or Flutter) can't work on UI, because they don't have access to the DOM (see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers and https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Functions_and_classes_available_to_workers). So I guess you will have to move everything UI-related out of your worker service.

In your example code, the service methods return Futures, and that means the full results will only be available for updating the UI when everything has been computed.

If you want to have progressive updates to your UI, you will need to change your service implementation and stream data from your workers back to your main program so it has a chance to update UI during processing. Squadron workers and worker pools supports streaming, you should just replace Future with Stream in the service definitions (using async* and yield to stream the results as appropriate).

SaadArdati commented 2 years ago

Hm, that won't do since our computation is not meant to stream any data. I'll see if moving everything out resolves the issue.

Closing this issue for now. I'll open a new one if any new issues occur. Thank you for your time!

d-markey commented 2 years ago

Ok, I hope you'll achieve what you want with your app :)

Have you tried narrowing imports? eg. import 'dart:ui' show Size, Offset, Rect; ?

SaadArdati commented 2 years ago

That's an excellent idea! Because creating our own implementation of those classes is a collosal task in our collosal project. I'll give it a shot right now.

SaadArdati commented 2 years ago

Sad to say it failed :(

d-markey commented 2 years ago

Bad luck :-( but was worth a shot.

d-markey commented 2 years ago

Transferred the issue to Squadron as it really belons there

d-markey commented 2 years ago

Version 3.2 has been published to pub.dev! You should be able to upgrade and remove the workaround try/catch from your solver_worker.dart file.