d-markey / squadron

Multithreading and worker thread pool for Dart / Flutter, to offload CPU-bound and heavy I/O tasks to Isolate or Web Worker threads.
https://pub.dev/packages/squadron
MIT License
79 stars 0 forks source link

Unable to throw on a stream from web #8

Closed martin-robert-fink closed 2 years ago

martin-robert-fink commented 2 years ago

Hi - me again.

Consider this code in a service:

class ParserService implements WorkerService {
  Stream<SignalValue> streamParser(List<dynamic> words) async* {
    int id = 0;
    for (var word in words) {
      await Future.delayed(const Duration(milliseconds: 500));
      if (id == 2) throw 'this is an error';
      yield SignalValue(signalDecimalValue: ++id, interval: id, idCode: word);
    }
  }

  // command IDs
  static const streamCommand = 1;

  // command IDs --> command implementations
  @override
  Map<int, CommandHandler> get operations => {
        streamCommand: (r) =>
            streamParser(r.args[0]).map((sv) => sv.serialize()),
      };
}

In the web version, with the pool.stream, I can listen, but the onError parameter doesn't get called as a result of the throw. When I run this on the non-web version, it works as expected, where the throw calls on the onError parameter in the main thread.

I wanted to check, is this a bug; or a limitation of streams on the web version?

Thanks, Martin

d-markey commented 2 years ago

That would be a bug, the idea with Squadron is to ensure same behavior on all platforms. I'll look into this and will keep you posted.

d-markey commented 2 years ago

I can't reproduce and the exceptions are correctly received in the onError callback. Can you provide the code of your "onError" function?

FYI exceptions thrown inside a Worker will be received as "WorkerException" by the caller.

I have committed a test case if you want to look at the code: https://github.com/d-markey/squadron/blob/main/test/test_suites/issues_test_suite.dart#L11

and the service implementation: https://github.com/d-markey/squadron/blob/main/test/worker_services/issues_service.dart#L4

martin-robert-fink commented 2 years ago

Hmmm. That's interesting. Here's the private function in main.dart:

  void _onPressed() async {
    var pool = WorkerPool(
      (kIsWeb) ? createJsParserWorker : createVmParserWorker,
      concurrencySettings: const ConcurrencySettings(
        maxWorkers: 4,
        maxParallel: 2,
      ),
    );
    await pool.start();

    pool.stream((w) => w.streamParser(words)).listen(
      (sv) {
        print('${sv.decimalValue}: ${sv.interval}, ${sv.idCode}');
      },
      onError: (object) => print('Got an error $object'),
      onDone: () => print('I\'m done'),
      cancelOnError: false,
    );
  }
martin-robert-fink commented 2 years ago

I looked at your test, then something dawned on me. To my knowledge, there is no way to test a web TargetPlatform. All TargetPlatforms are native (iOS, Android, Linux, Windows, Mac, Fuchsia). So, your test wouldn't run the web version AFAIK. As I mentioned, my stuff works fine on native. It doesn't work on web.

d-markey commented 2 years ago

The tests run allright on VM and Web, I've had a hard time setting them up especially having the Web workers run. It made me delve into the internals of test and I've eventually made pull request to https://github.com/dart-lang/test to improve support for Web Workers but it's not a priority obviously :-( The tests are pure Dart btw, not Flutter.

I'll try to reproduce your issue with a full Flutter setup.

martin-robert-fink commented 2 years ago

You can clone this if you want: Test Project

d-markey commented 2 years ago

Thanks, reproduced. I'm working on it, the stream implementation is actually missing a few things right now like pause/resume capabilities. I have to strengthen that part but it looks tricky and it may take some time before I can fix it

martin-robert-fink commented 2 years ago

Great! Thanks for letting me know. Let me know if I can help with anything. I'm not blocked from forward progress, so it's ok. It works on native, so I can try there. I also haven't fully decided if returning a stream is the best way for me to do this. I'm leaning there because it does provide a very effective way for me to deal with parsing errors and cancel all threads if one of them gets an error. In all cases, errors are 'hard errors'... any issues with parsing aborts and cancels the entire parsing process. There is no recovery.

d-markey commented 2 years ago

OK, thanks for the info. At first I tought the cancelOnError: false was the problem when I read your listener's code, but it's not!

martin-robert-fink commented 2 years ago

BTW... At some point I'd be curious to hear how to target web during unit/widget testing. When I write tests I use the variant: parameter with TargetPlatformVariant.all() on all tests so that every widget test runs on all platforms. But, I haven't figured out how to make one of those targets web. To my knowledge debugDefaultTargetPlatformOverride does not have a TargetPlatform.web option.

d-markey commented 2 years ago

I don't know, I've never tried to write tests for Flutter/Web. Squadron is not specific to Flutter, so the tests are for Dart/VM & Dart/Web, there's no Flutter here...

martin-robert-fink commented 2 years ago

Ahhh... this was still very helpful. I never looked at Dart testing in detail before. When I wrote dart unit tests (and not WidgetTests), I wouldn't use platform variants (test() doesn't support it). But, I just read the docs on platform config and being able to do platform: [chrome, vm] in the dart_test.yaml file. So, thanks for that. Very cool.

d-markey commented 2 years ago

Version 3.4 is now published and should fix this issue!

martin-robert-fink commented 2 years ago

This is great. I tested (using my small test program) and throw works on a stream, and onError gets the error. As an fyi, the link to the issue in the change log is broken; and the readme likely needs some updates (didn't go through it all in detail, but did notice it still references 3.3.0). It'll take me a bit of time to test on real data (grandson's birthday this weekend 😄, but I'll let you know).

d-markey commented 2 years ago

Thanks for the feedback. The link is broken in pub.dev but not in github, I'll assume it's a bug from pub.dev :-) I'll update the changelog the next time I publsh a new version but will leave it as is for now.

Thanks for all the feedback again, you helped improve Squadron A LOT!