d-markey / squadron

Multithreading and worker thread pool for Dart / Flutter, to offload CPU-bound and heavy I/O tasks to Isolate or Web Worker threads.
https://pub.dev/packages/squadron
MIT License
79 stars 0 forks source link

Can't get web worker going (native working fine) #7

Closed martin-robert-fink closed 2 years ago

martin-robert-fink commented 2 years ago

Hi - I'm just starting to learn/use squadron (thanks for doing this BTW)...

What I've tried to do (as part of learning) is to take the sample code in the readme file and combine that into a test app to make sure I understand all the parts. I did find the other sample (squadron_sample) to be too complicated to start with. My goal was to create a bit of a "hello world".

My end goal is to take a parser I've got (it parses VCD [Value Change Dump] files that can get quite large (hundreds of MB)).

Here's a link to my repo "Hello World": squadron_test

I'm able to get this to work on the Mac, with the web version, I get Exceptions but I can't isolate where they would be coming from.

The main.dart just puts a button widget at the center. Click on that and the _onPressed function runs the code you have the section of the readme: "you can now distribute your workloads". Of course, I'm just using the debug window for now.

I setup all the worker/service stuff in the sample folder and made the folder structure look like the squadron_sample. The only thing I changed from the readme file code is the addition of some print statements. The readme file also still uses deprecated stuff (maxWorkers/maxParallel) so I changed that to use ConcurrencySettings.

In the root/lib folder there's two files to generate the .js files, one is build_web_worker_posix and the other is build_web_worker_windows.bat. I'm using a Mac for development, and I think you're using Windows, so that should let you do your own dart2js compile. I didn't try this on Windows (just Mac and web). If you want me to do Windows or Linux or mobile, I can do that as well. I did put the <script> in index.html as well.

I'm guessing that I'm missing something pretty basic in my "hello world". So, I'm hoping you can quickly find what's wrong. Maybe some of this would be useful to you as well. I don't want to move on to getting my parser in there until I can get this basic test going.

I can get you a full Flutter doctor if you want, but hopefully, this will do: Flutter (Channel stable, 2.10.4, on macOS 12.3.1 21E258 darwin-arm, locale en-US)

Thanks for you help on this, I really appreciate it. Martin

martin-robert-fink commented 2 years ago

Ok, after sending this, I started reading through the closed issues in detail. I found an error where I wasn't pointing to where the js file was in the web/workers folder. So, now the exceptions went away. But, that exposed a new issue.

When I run the native version, the print statement in sample_service.dart (print('elapsed: ${sw.elapsed}');) runs and I can see the result in the debug console. But, in the web version, that print statement doesn't run.

Is there a reason for this?

Thanks, Martin

d-markey commented 2 years ago

Hello Martin,

I believe this is due to Web Workers not being aware of the Dart debugger, so print resolves to console.log in the browser. You should be able to see your message in your browser's JavaScript console. Is it there allright?

David

martin-robert-fink commented 2 years ago

Hi David -

Thanks for this... that's exactly what happened and I can see the print messages in the javascript console. Maybe add that to the readme :). (Maybe also update the readme to ConcurrencySettings).

This is great. Thanks. I'll start migrating my parser now.

Martin

d-markey commented 2 years ago

I've implemented a concept of "LocalWorker" recently, whose purpose is to allow workers to execute code in the main context. That could be a good use case for a logging mechanism where workers can send their messages to the logger in the main app. I'll have a look at this.

martin-robert-fink commented 2 years ago

That's a really cool idea. Let me know how it goes.

Side question... is there a way to import dependencies when those dependencies support all platforms? I'm specifically trying to import isar which supports web and native. The idea is that I'd store data in the DB (IndexedDB on web) in the thread as I parse data. However, the dart compile js fails. I can send the data back to the main thread to do the DB write there, but it would be nice to do it in the thread proper.

Thanks, Martin

martin-robert-fink commented 2 years ago

Turns out it was path_provider causing the js compile to fail, not isar. So, if I pass the getApplicationSupportDirectory() as a parameter to the worker, I think I can do this!

Martin

d-markey commented 2 years ago

It should be OK as long as native Web Workers can access IndexedDB which should work in most browsers now.

martin-robert-fink commented 2 years ago

It was worth a shot, but it didn't work. Any isar calls in the web worker cause an exception. So, I'll have to parse, json->String, send to main, String -> json -> toObject.

d-markey commented 2 years ago

If you can send results as you parse eg. I understand VCD is time-oriented so maybe you'll save data timsetamp after timestamp, I would use a streaming worker so you can write records or chunks of records to isar while parsing continues, as opposed to parsing the whole thing and sending a list with the full results to be saved in DB.

Just curious, what exception are you getting from isar?

martin-robert-fink commented 2 years ago

Yes, I'm having a similar thought. Each value change interval can have a number of signals in it. So, my plan is to read a change interval, collect the signal changes for that interval and send that to a worker/parser. It would send back the results of those value changes. Where I need to run some experiments is that with my current non-threaded parser, I'm parsing about 18,000 value changes per second. Some VCD files can have 10 million or more value changes. So, if I break this down to a worker per time interval, what's the optimal number of parallel workers? Given that I'll now have the overhead of transferring the data to the main thread, how does that impact things?

Now, even if all I did was parse from beginning to end (as I do now), but I do it on a background thread, I'll still have the benefit of not blocking the UI while the parsing continues. But, I don't want to do this. Part of the reason for isar is that I did an in-memory version of this (non-web). When I keep everything in memory, it worked fine for desktop platforms, but choked on mobile (iPhone/iPad) when the VCD files started getting very large. If you're curious to see what the iPad (in-memory) version of the app looks like, go here: FPGAView Video.

A lot of the feedback I got is that FPGA simulation folks really want a web interface. So, I decided to restart (I'm changing state management too) to make it web native from the start. In order to remove the memory limitation I'm going to go from in-memory to DB based. Isar is a good choice for me because it's a simple key/value store and it supports the web (via IndexedDB).

So, still much experimenting to do :)

The exception logged to console.log is:

errors.dart:284 Uncaught (in promise) Error: Instance of 'WorkerException'

    at Object.createErrorWithStack (errors.dart:284:10)
    at Function._throw (core_patch.dart:288:28)
    at Function.throwWithStackTrace (errors.dart:108:5)
    at async._AsyncCallbackEntry.new.callback (zone.dart:1413:11)
    at Object._microtaskLoop (schedule_microtask.dart:40:11)
    at _startMicrotaskLoop (schedule_microtask.dart:49:5)
    at async_patch.dart:166:15
createErrorWithStack @ errors.dart:284
_throw @ core_patch.dart:288
throwWithStackTrace @ errors.dart:108
(anonymous) @ zone.dart:1413
_microtaskLoop @ schedule_microtask.dart:40
_startMicrotaskLoop @ schedule_microtask.dart:49
(anonymous) @ async_patch.dart:166
Promise.then (async)
_scheduleImmediateWithPromise @ async_patch.dart:164
_scheduleImmediate @ async_patch.dart:136
_scheduleAsyncCallback @ schedule_microtask.dart:69
_rootScheduleMicrotask @ zone.dart:1493
scheduleMicrotask @ zone.dart:1705
[_asyncCompleteError] @ future_impl.dart:664
[_completeError] @ future_impl.dart:49
completeError @ future_impl.dart:32
[_completeWithError] @ worker_task.dart:100
(anonymous) @ worker_task.dart:150
_wrapUp @ worker_task.dart:136
runBody @ async_patch.dart:84
_async @ async_patch.dart:123
[_wrapUp] @ worker_task.dart:132
_runFuture @ worker_task.dart:150
(anonymous) @ async_patch.dart:60
runBinary @ zone.dart:1690
handleError @ future_impl.dart:174
handleError @ future_impl.dart:778
_propagateToListeners @ future_impl.dart:799
[_complete] @ future_impl.dart:592
_cancelAndValue @ stream_pipe.dart:61
(anonymous) @ stream.dart:1288
_checkAndCall @ operations.dart:334
dcall @ operations.dart:339
(anonymous) @ html_dart2js.dart:37301

At one point I was seeing Exceptions coming from dart:ui, but it's not in this Exception stream.

d-markey commented 2 years ago

I have committed a small change to WorkerException to have more verbose debug messages. Could you reference Squadron from github directly and try again?

In your packages.yaml file:

dependencies:
   squadron:
      git: https://github.com/d-markey/squadron.git

It looks like isar is running a future that fails but we're missing the error message.

Regarding concurrency settings, you could go with just one worker parsing the whole file (min/maxworkers = 1 and maxparallel= 1) but if you can parallelize and split the file beforehand, you could go with maxworkers = 3 and maxparallel = 1. Parsing is essentially CPU bound so I doubt a worker would be able to handle more than 1 task at a time. I guess it's safe to assume your users have 4 cores, so keeping one for the UI makes 3 the safest bet? Or if you split the file in just 2 parts, set maxworkers = 2.

d-markey commented 2 years ago

But maybe isar is not Web Worker-compatible. Workers can't access the DOM and I see:

https://github.com/isar/isar/blob/main/packages/isar/lib/src/web/open.dart

   script.src = 'https://unpkg.com/isar@$isarWebVersion/dist/index.js';
   script.async = true;
   assert(document.head != null);
   document.head!.append(script);
   await script.onLoad.first.timeout(Duration(seconds: 30), onTimeout: () {

I believe document is not available in Web Workers.

d-markey commented 2 years ago

Reading your comment again, I would actually send whole chunks of the file split on well chosen time interval boundaries, eg. split the file in 2 or 3 parts and have the workers parse and load all time intervals from the chunks. Your service implementation can process all time intervals from the chunk and return a Future<List<ChangeData>>; or (that would be my choice) it could yield individual time intervals while parsing (in that case the service must return a Stream<ChangeData> and the worker must use the .stream() method instead of execute()). When the service caller (in the main app) receives the data, it can then save it to isar if isar doesn't work in Web Workers.

martin-robert-fink commented 2 years ago

Hi - Thanks for really looking into this and for all your suggestions. I really appreciate it. I did put in a feature request on ISAR for background thread support. We'll see what happens there. Let me spend some time on this and see what I come up with. I do need to modify the parser to run in workers. Then run some experiments to see wha the best way is to break up the file/intervals (memory pressure, overhead of passing data around, performance benefits). I will let you know what comes out of it. Again, thanks for spending so much time on this. It's so great to see developers like you who really want to do great work. Martin

martin-robert-fink commented 2 years ago

Oh, here's the Exception caught using the git version of squadron you mentioned above, in case this is useful to you:

errors.dart:284 Uncaught (in promise) Error: ["$W","ReferenceError: document is not defined","ReferenceError: document is not defined\n    at http://localhost:50475/workers/sample_worker.dart.js:2276:1\n    at is.a (http://localhost:50475/workers/sample_worker.dart.js:1339:63)\n    at is.$2 (http://localhost:50475/workers/sample_worker.dart.js:3236:14)\n    at Object.b2 (http://localhost:50475/workers/sample_worker.dart.js:1325:11)\n    at Object.iB (http://localhost:50475/workers/sample_worker.dart.js:2286:10)\n    at http://localhost:50475/workers/sample_worker.dart.js:2290:15\n    at is.a (http://localhost:50475/workers/sample_worker.dart.js:1339:63)\n    at is.$2 (http://localhost:50475/workers/sample_worker.dart.js:3236:14)\n    at Object.b2 (http://localhost:50475/workers/sample_worker.dart.js:1325:11)\n    at Object.f6 (http://localhost:50475/workers/sample_worker.dart.js:2309:10)","1056267240",1]

    at Object.createErrorWithStack (errors.dart:284:10)
    at Function._throw (core_patch.dart:288:28)
    at Function.throwWithStackTrace (errors.dart:108:5)
    at async._AsyncCallbackEntry.new.callback (zone.dart:1413:11)
    at Object._microtaskLoop (schedule_microtask.dart:40:11)
    at _startMicrotaskLoop (schedule_microtask.dart:49:5)
    at async_patch.dart:166:15
createErrorWithStack @ errors.dart:284
_throw @ core_patch.dart:288
throwWithStackTrace @ errors.dart:108
(anonymous) @ zone.dart:1413
_microtaskLoop @ schedule_microtask.dart:40
_startMicrotaskLoop @ schedule_microtask.dart:49
(anonymous) @ async_patch.dart:166
Promise.then (async)
_scheduleImmediateWithPromise @ async_patch.dart:164
_scheduleImmediate @ async_patch.dart:136
_scheduleAsyncCallback @ schedule_microtask.dart:69
_rootScheduleMicrotask @ zone.dart:1493
scheduleMicrotask @ zone.dart:1705
[_asyncCompleteError] @ future_impl.dart:664
[_completeError] @ future_impl.dart:49
completeError @ future_impl.dart:32
[_completeWithError] @ worker_task.dart:100
(anonymous) @ worker_task.dart:150
_wrapUp @ worker_task.dart:136
runBody @ async_patch.dart:84
_async @ async_patch.dart:123
[_wrapUp] @ worker_task.dart:132
_runFuture @ worker_task.dart:150
(anonymous) @ async_patch.dart:60
runBinary @ zone.dart:1690
handleError @ future_impl.dart:174
handleError @ future_impl.dart:778
_propagateToListeners @ future_impl.dart:799
[_complete] @ future_impl.dart:592
_cancelAndValue @ stream_pipe.dart:61
(anonymous) @ stream.dart:1288
_checkAndCall @ operations.dart:334
dcall @ operations.dart:339
(anonymous) @ html_dart2js.dart:37301
martin-robert-fink commented 2 years ago

Ok, sorry to bother you. I modified my little test "hello world" to process a stream. It works on MacOS, but fails with an exception on Web. Here are the parts, can you tell me why this fails on web?

main.dart

// ignore_for_file: avoid_print

import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:squadron/squadron.dart';

import './sample/browser/sample_worker.dart';
import './sample/vm/sample_worker.dart';

void main() {
  runApp(const SquadronApp());
}

class SquadronApp extends StatelessWidget {
  const SquadronApp({Key? key}) : super(key: key);

  final List<String> wordList = const ['a', 'b', 'c', 'd', 'e', 'f'];

  Stream<String> words() async* {
    for (final word in wordList) {
      yield word;
    }
  }

  void _onPressed() async {
    var pool = WorkerPool(
      (kIsWeb) ? createJsSampleWorker : createVmSampleWorker,
      concurrencySettings: const ConcurrencySettings(
        maxWorkers: 4,
        maxParallel: 2,
      ),
    );
    await pool.start();

    pool.stream((worker) => worker.streamParser(words())).listen((data) {
      print(data);
    });
  }

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      home: Scaffold(
        body: Center(
            child: ElevatedButton(
          onPressed: _onPressed,
          child: const Text('Run...'),
        )),
      ),
    );
  }
}

sample_worker.dart

// If you modify ANY of the code in this file, and it needs to run on the
// web, then make sure to rerun lib/build_web_worker[_posix | _windows.bat].

import 'package:squadron/squadron.dart';

import './sample_service.dart';

class SampleWorker extends Worker implements SampleService {
  SampleWorker(dynamic entryPoint, {String? id, List args = const []})
      : super(entryPoint, id: id, args: args);

  @override
  Stream<String> streamParser(Stream<String> words) {
    return stream(SampleService.streamCommand, [words]);
  }
}

sample_service.dart

// If you modify ANY of the code in this file, and it needs to run on the
// web, then make sure to rerun lib/build_web_worker[_posix | _windows.bat].

import 'package:squadron/squadron.dart';

class SampleService implements WorkerService {
  Stream<String> streamParser(Stream<String> words) async* {
    int i = 0;
    await for (final word in words) {
      await Future.delayed(const Duration(milliseconds: 500));
      yield '$word ${++i}';
    }
  }

  // command IDs
  static const streamCommand = 1;

  // command IDs --> command implementations
  @override
  Map<int, CommandHandler> get operations => {
        streamCommand: (WorkerRequest r) => streamParser(r.args[0]),
      };
}

vm/sample_worker.dart

import 'package:squadron/squadron.dart';

import '../sample_worker.dart';
import '../sample_service.dart';

SampleWorker createVmSampleWorker() => SampleWorker(_main);

// Isolate entry-point.
// It must be a top level function or static method accepting a Map agrument.
// The argument passed to the entry-point must be passed to the run() function.
void _main(Map command) => run((startRequest) => SampleService(), command);

browser/sample_worker.dart

import 'package:squadron/squadron.dart';

import '../sample_service.dart';
import '../sample_worker.dart';

SampleWorker createJsSampleWorker() =>
    SampleWorker('/workers/sample_worker.dart.js');

// Web Worker entry-point.
// It must be a parameter-less "main()" function.
void main() => run((startRequest) => SampleService());

... and here is the exception:

(index):65 Installed new service worker.
VM231:1 This app is linked to the debug service: ws://127.0.0.1:59176/3Qp6dTIO4lE=/ws
errors.dart:284 Uncaught (in promise) Error: ["$W","UnimplementedError: structured clone of other type","dart-sdk/lib/_internal/js_dev_runtime/private/ddc_runtime/errors.dart 251:49  throw_\ndart-sdk/lib/html/html_common/conversions.dart 153:5                          walk\ndart-sdk/lib/html/html_common/conversions.dart 162:17                         copyList\ndart-sdk/lib/html/html_common/conversions.dart 136:14                         walk\ndart-sdk/lib/html/html_common/conversions.dart 122:31                         <fn>\ndart-sdk/lib/_internal/js_dev_runtime/private/linked_hash_map.dart 21:13      forEach\ndart-sdk/lib/html/html_common/conversions.dart 121:8                          walk\ndart-sdk/lib/html/html_common/conversions.dart 168:16                         convertDartToNative_PrepareForStructuredClone\ndart-sdk/lib/html/html_common/conversions_dart2js.dart 94:10                  convertDartToNative_PrepareForStructuredClone\ndart-sdk/lib/html/html_common/conversions.dart 32:10                          convertDartToNative_SerializedScriptValue\ndart-sdk/lib/html/dart2js/html_dart2js.dart 21696:23                          postMessage]\npackages/squadron/src/browser/_channel.dart 24:7                              [_postRequest]\npackages/squadron/src/browser/_channel.dart 109:5                             sendStreamingRequest\npackages/squadron/src/worker.dart 165:20                                      stream\ndart-sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart 325:40           runBody\ndart-sdk/lib/async/schedule_microtask.dart 40:11                              _microtaskLoop\ndart-sdk/lib/async/schedule_microtask.dart 49:5                               _startMicrotaskLoop\ndart-sdk/lib/_internal/js_dev_runtime/patch/async_patch.dart 166:15           <fn>\n","78075063",1]

    at Object.createErrorWithStack (errors.dart:284:10)
    at Function._throw (core_patch.dart:288:28)
    at Function.throwWithStackTrace (errors.dart:108:5)
    at async._AsyncCallbackEntry.new.callback (zone.dart:1413:11)
    at Object._microtaskLoop (schedule_microtask.dart:40:11)
    at _startMicrotaskLoop (schedule_microtask.dart:49:5)
    at async_patch.dart:166:15
createErrorWithStack @ errors.dart:284
_throw @ core_patch.dart:288
throwWithStackTrace @ errors.dart:108
(anonymous) @ zone.dart:1413
_microtaskLoop @ schedule_microtask.dart:40
_startMicrotaskLoop @ schedule_microtask.dart:49
(anonymous) @ async_patch.dart:166
Promise.then (async)
_scheduleImmediateWithPromise @ async_patch.dart:164
_scheduleImmediate @ async_patch.dart:136
_scheduleAsyncCallback @ schedule_microtask.dart:69
_rootScheduleMicrotask @ zone.dart:1493
scheduleMicrotask @ zone.dart:1705
[_asyncCompleteWithValue] @ future_impl.dart:638
[_asyncComplete] @ future_impl.dart:633
complete @ future_impl.dart:45
(anonymous) @ _channel.dart:285
_checkAndCall @ operations.dart:334
dcall @ operations.dart:339
(anonymous) @ html_dart2js.dart:37277
d-markey commented 2 years ago

hello Martin,

The exception message is very helpful and I believe it is related to your code calling openIsar(), which eventually tries to access document. But document is not defined in Web Workers, hence the exception. Isar wants to load its runtime by injecting a <SCRIPT> tag but it won't work that way. I've also had a look at the script they reference from https://unpkg.com/isar@2.2.1/dist/index.js. Turns out it uses window which is also not available in Web Workers (https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Using_web_workers + https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Functions_and_classes_available_to_workers). FYI that Isar JS file was built from https://github.com/isar/isar-web, it is the browser equivalent of the native runtime they distribute for iOS/Windows/etc.

So unless they fix the issue in both the Isar Dart package and the Isar-Web TypeScript library, you're out of luck and have no choice other than saving data in your main app...

Regarding the sample, you cannot pass a Stream to a worker. In Web scenarios, the data that can be passed to Web Workers is quite limited -- the best bet is to stick to numbers, booleans, strings, and generic array/maps (i.e. List and Map and not List<T> or Map<K, V> -- JavaScript doesn't care about types, so it's lost in the communication process).

The best way to mimick your use-case is to provide a string parameter to your service method (eventually, this will be a chunk of your file) and then stream the results to the caller.

Eg. in main.dart:

    pool.stream((worker) => worker.streamParser('A chunk of data')).listen((data) {
      print(data);
    });

in sample_worker.dart:

  @override
  Stream<String> streamParser(String data) {
    return stream(SampleService.streamCommand, [data]);
  }

in sample_service.dart:

  Stream<String> streamParser(String data) async* {
    int i = 0;
    for (final word in data.split(' ')) {
      await Future.delayed(const Duration(milliseconds: 500));
      yield '$word ${++i}';
    }
  }

Regarding the overhead of transmitting data to the Web Worker, I believe it should be minimal -- from my understanding, Strings are immediately shared with the worker. I haven't verified that point though.

martin-robert-fink commented 2 years ago

Perfect. I think I have enough to get going now. I'm glad I went through my 'Hello World' exercise to figure all this out. I should be able to be productive with the parser now. I'll let you know how it goes. BTW, if you looked at the video, you'll also see that I draw the waveforms (custom painter). I sometimes have to deal with 3 million data points (I use a few tricks to reduce that). But, I'm hoping to also be able to use Squadron to compute the waveforms on a background thread. Everything to not block the UI. Thanks again!

d-markey commented 2 years ago

Last piece of advice I can give you, you will probably want to return structured objects from your service method. Assuming you implement a ValueChangeInterval class to represent one change interval containing the signals, I guess your service will have a form like:

class ParserService {
   Stream<ValueChangeInterval> parse(String chunk); // if you decide to stream individual values

   static const parseCommand = 1;
}

To make sure it works on browser and vm, you should implement a serialize() method and a static deserialize() method in ValueChangeInterval. serialize() would return a list or a map of base types such as int or String, and deserialize() would work the other way around (take a list or a map and rebuild a ValueChangeInterval).

Then in the operations map, you would write:

  late final Map<int, CommandHandler> operations = {
      parseCommand: (r) => parse(r.args[0]).map((v) => v.serialize())
   };

And in the worker:

   Stream<ValueChangeInterval> parse(String chunk) =>
          stream(ParserService.parseCommand, [chunk])
             .map((v) => ValueChangeInterval.deserialize(v));

So serialization is kept as close as possible to the end-points between your main app and the Web worker, and you can work safely with structured data in the rest of your code :)

Happy coding!

martin-robert-fink commented 2 years ago

Hi there -

I thought I'd give you an update. First, my isar experiment is a bust. It took 31 minutes to do a 'findAll' on 100,000 value changes (even with indexing). So, that won't work. But, all this caused me to restructure the parser from the ground up and I found where I was doing 'stupid stuff'. I restructured the parser as a series of StreamTransformers. The end-result is that I can now parse over 10 million value changes in about 15 seconds (that's on the iPhone simulator - trying to get worse case). As a result, I'm putting the Squadron work on pause for now (I may still need it for waveform displays). I'm likely going to just use Json files for data storage. If I can keep everything in memory (even on an iPhone) with my new data structure; that's what I'll keep. All this was still a very, very useful/helpful process for me, so thank you for doing this work, and please keep it going.

Martin

d-markey commented 2 years ago

Hello, thanks for the update! I will fix the streaming issue anyway -- this problem you unveiled is quite interesting.

Feel free to come back if/when you need more background processing.

David

martin-robert-fink commented 2 years ago

Hey there - I'm back :)

Well, I decided to get back to Squadron and here's why. When I parse very large files (about 100MB in size, with about > 1M value changes), the parse can take more than 15 to 30 seconds. During that time, the UI essentially freezes. I could setup a dialog box that provides a periodic update to parsing progress, but that's not ideal. So, I decided that background parsing was more important than raw performance. I did a few experiments. My first one was to split up the parsing into intervals. The VCD file has a line that looks like #1000 which is a new interval and then the next lines are all the value changes until the next interval. However, using this chunking of data injected enough serialization/deserialization that the UI pretty much still blocked. So, I decided to read the entire file (line by line) and have two parsing workers. One for the header (which is usually pretty quick), and then one for all the value changes. I have all this working, and the good new is that the UI doesn't block. My new problem is that when I throw a 100MB file at it, at some point in the value changes parsing progress, Chrome crashes with an "Aw Snap" and an Error Code 5. I'm guessing (and this is what I wanted to check with you) that I'm consuming too much memory in a web worker and Chrome can't handle it. Would you agree? I had stopped using Streams (partly because of the throw issue), but I'm thinking that I may need to go back to a stream and then send back chunks every time I hit something like 100,000 value changes and hope you find a solution the the throw problem :). Do you happen to know of memory limitations on web workers? Thanks, Martin

martin-robert-fink commented 2 years ago

Oh, a couple more data points just in case they are helpful. I put a counter to see how many lines the web worker went through before crashing: 12,770,306. I ran this on a Mac to make sure it works there, and it does. It parsed the file with an end result of 10,996,209 values in 1:42.45 (no UI jitters during parse 😄 - yay!).

d-markey commented 2 years ago

Hello Martin,

Yes, it seems there's a limitation --> see https://bugs.chromium.org/p/chromium/issues/detail?id=1081316

Apparently Firefox should be working?

I'll be fixing the stream implementation in the next few days and will get back to you during the weekend.

Regards David

martin-robert-fink commented 2 years ago

I tried Safari and it runs for a while, but then also crashes. Then, I installed Firefox to try and it crashes almost immediately. On smaller files, it does work, but Firefox seems to parse much more slowly.

I guess I'll have to work on a chunking strategy that parses large enough chunks to not worry about serialization/deserialization overhead, but allows the web_worker to run. This will be an interesting experiment because I'm wondering if I'll need to run one web worker at a time, or if I can run multiples at a time. Is the memory pressure per-web-worker; or overall for the browser?

Looking forward to the new stream implementation.

Thank you. Martin

d-markey commented 2 years ago

I would use several workers to crunch your data. If the file is already available as a list of lines, you could split in 2 (or more) chunks eg. look at the line in the middle and rewind a few lines to find the closest timestamp, then feed two workers (or a worker pool with maxParallel = 1 and maxWorkers > 1) with each subset of lines. And yes, I would opt for streaming each individual result back to the main thread and merge the two streams in one.

martin-robert-fink commented 2 years ago

Hey - I was able to get my stuff all going before the birthday party 😄 !!!

I think I have run into a fundamental limitation of the web browser, which based on the links you sent me is about 4GB of RAM. I rebuilt everything to return a stream of data, but for this test, I send all data in on a single chunk. Returning one entry at a time would work, but take about 600 hours 😄 ... too much serialize/deserialize. So, I decided to stream back 10,000 entries at a time. That worked pretty well. The browser crashes after 6.2 million entries. So, with a bit of approximate math, if the browser has a 4GB RAM limit, and it fails at 6.2 million entries, each entry is about 600 bytes which all makes perfect sense. On the native/Mac implementation, all this works with no issues. At the end of the day, the crashing really has nothing to do with web workers. It's just a limitation of the browser. If you recall, when I started this, I wanted to use ISAR. This is why. My hope was to use a file based data mechanism instead of being completely in-memory. However, ISAR (at least on the browser) is horrifically slow (to be fair, it's not ISAR, it's IndexedDB). So, I really need to stick to an in-memory implementation.

As a side-note, at one point in the process, I tried to parse one of my known bad files (I have a few to use for testing) by accident, and the new stream implementation worked like a charm and all of my 'throw'n parser errors got trapped properly.

In the end, I think what this means is that I will need to place an upper file size limit for the browser version, and then just display a message that larger file sizes need a native implementation. I suspect the iPhone will have a similar limitation (haven't tested yet). Most ASIC/FPGA designers tend to use pretty beefy Windows or Linux boxes, so it's entirely reasonable to direct them there for mega sized simulations. The web version will be very useful for casual users, students, etc.

I'm going to spend a bit of time on chunking size and stream-return sizes to see if I can find an optimal performing solution.

Thanks again for all your work here. This has been really fun so far (especially figuring out all this browser stuff). At some point I should do a video or write an article about how to get all this cross-platform threading stuff to work and the limitations to be aware of.

All the best, Martin

d-markey commented 2 years ago

Hey Martin,

well done! Thanks for all the feedback, very instructive. There may be other options to optimize memory usage, I havent tried but you could maybe try one or both of these:

Have a nice birthday party!

David

martin-robert-fink commented 2 years ago

Hi there -

Here's an update (and a question - Let me know if you'd like me to post a new issue, or keep this conversation going, or send separate email - my email is martin.robert.fink@gmail.com).

I've run a number of experiments (both pool.execute and pool.stream). It appears that the best results are with 6 stream workers (at least on my M1Max Mac). The biggest discovery is that for native platform, I really need to remove the serialization/deserialization. It doubles the parse time on native platforms. I understand that it's required for web. Streams are best because of the ability to better handle parsing errors and the parse time was not meaningfully different from pool.execute.

However, my issue now is UI janking/freeze. When I start the 6 streams (or any number, I've tried from 1 to 16), the UI is freezing. One of the benefits of threading was performance, and the other was not blocking the UI. But, now I'm blocking the UI. I'm wondering if something changed in 3.4.0 that might have affected this? I'd really like to have the parsing happening in the background so the user can do other work while the parsing is going ahead.

Any other ideas? Thanks, Martin

d-markey commented 2 years ago

Hello,

UI janking is due to "longer" tasks being pushed to the event loop in your main thread (I don't know how familiar you are with Dart's threading model? One key thing is that once a piece of code starts running it will execute in one go up to the first "await" keyword or to the end if there's no "await". Any event, eg. due to user action, received in the meantime will be postponed until processing is over -- the user action may even be queued after other futures that will need to run before!).

In a streaming context with Squadron, I suspect there may be two reasons:

Additionally, if you have 6 workers in parallel they could also be flooding the main event loop? Is your UI more responsive with less workers?

martin-robert-fink commented 2 years ago

Ok. As I mentioned above, I figured out the deserialization issue and removed it for native. So, I stream back List when the complete chuck of data is run. I have tried from 1 to 16 workers and I still Jank (more freeze) the UI.

I create the streams like this....

    for (int i = 1; i <= maxWorkers; i++) {
        // Compute start/end of data chunks
        workerStreams.add(pool.stream(
            (worker) => worker.valueParser(valueLines.sublist(start, end))));
      }

then

      StreamGroup.merge(workerStreams).listen(
          (sv) => signalValues.addAll(sv),
          onError: (e, s) => print(e),
          onDone: () {
            print('Signal Values: ${signalValues.length}');
            print('Total parse: ${DateTime.now().difference(startTime)}');
            pool.stop();
          });

parserWorker snippet...

  @override
  Stream<List<SignalValue>> valueParser(List<dynamic> lines) =>
      stream(ParserService.parseValues, [lines]);
  //remove deserialization while testing native  
  //.map((sv) =>
  // (jsonDecode(sv) as List<dynamic>)
  //     .map((y) => SignalValue.fromJson(y))
  //     .toList());
}

parserService snippet...

  @override
  Map<int, CommandHandler> get operations => {
        parseHeader: (r) => headerParser(r.args[0]),
        parseValues: (r) => valueParser(r.args[0]),
        // remove serialization for testing native
        //.map((sv) => jsonEncode(sv)),
      };

Not sure if it's really helpful, but here's the actual parser code in parserService (the for loop ends up running millions of times depending on the size of the file being parsed):

  Stream<List<SignalValue>> valueParser(List<dynamic> lines) async* {
    List<String> linesList = lines.map((line) => line.toString()).toList();
    List<SignalValue> signalValues = [];
    int interval = 0;
    for (String line in linesList) {
      if (line.isEmpty ||
          line.startsWith('\$dump') ||
          line.startsWith('\$end')) {
        continue;
      }
      List<String> data = line.split(RegExp(r'\s'))
        ..removeWhere((e) => e.isEmpty);
      if (data.first[0] == '#') {
        interval = int.parse(data.first.substring(1));
        continue;
      }
      var valueType = data.first[0].toLowerCase();
      if (valueType == 'x' ||
          valueType == 'z' ||
          valueType == '0' ||
          valueType == '1') {
        // Scalar
        signalValues.add(SignalValue(
          signalBitValue: [
            Bit(bitType: BitTypeString(valueType).fromBitString)
          ],
          interval: interval,
          idCode: data.first.substring(1),
        ));
      } else {
        // Vector
        if (valueType != 'b' && valueType != 'r') {
          throw 'bad timing signalValue: $data';
        }
        if (valueType == 'b') {
          var bitList = data.first.substring(1);
          List<Bit> tmpBits = [];
          for (var i = bitList.length; i >= 1; i--) {
            tmpBits
                .add(Bit(bitType: BitTypeString(bitList[i - 1]).fromBitString));
          }
          signalValues.add(SignalValue(
              signalBitValue: tmpBits, interval: interval, idCode: data.last));
        }
        if (valueType == 'r') {
          var value = int.parse(data.first.substring(1));
          signalValues.add(SignalValue(
            signalDecimalValue: value,
            interval: interval,
            idCode: data.last,
          ));
        }
      }
    }
    if (signalValues.isNotEmpty) yield signalValues;
  }

So, I don't think I'm doing any work on the main UI thread. I think I have a reasonable understanding of the Dart threading model. See here:

Dart is indeed multi-threaded

and:

Multi-isolate web server: End-to-End OAuth2 Demonstrator Code here

Anyway, thanks for responding. I'll keep playing around and see if something comes up. I'd like to eliminate the UI freezing. I may try an experiment of launching Isolates manually and see what happens.

Martin

martin-robert-fink commented 2 years ago

Hey -

Well, I ran another experiment to try something. I converted my parse function above into a top-level function and removed the stream stuff, also changes List to List. Then, did this:

      signalValues = await compute(valueParser, valueLines);
      print('Signal Values: ${signalValues.length}');
      print('Parse Time: ${DateTime.now().difference(parseStart)}');

Interesting is that the this method generated the fastest parse times of everything I've done so far. But, the UI still froze while the compute function was running. So, I had a different thought. I built a release version of the app and then tried again. This time, the UI didn't freeze during the compute function; but did freeze when it was returning from compute (the overhead of returning 10Million+ values across the Isolate boundary (I'm guessing).

So, then, I tried building a release version of the Squadron based app and very similar results. UI did not freeze (some stuttering, but not bad), and then a freeze at the end when returning the 10M values from the Isolate.

So, it appears that Isolate work (squadron or not) when running debug built code (at least on a Mac) will still have UI freezing effects. I do have a thread monitor on my system that runs in the menu bar, so I can see the threads running.

It appears that the unique part of my use-case which is different than most running compute intensive tasks is the returning of a large volume of values. I suspect that most people will end up doing a compute intensive task, but then returning a result which is small (in memory size). This does show one of the downsides of the Dart Isolate model where you don't have shared global address space.

I think the mystery is solved for now. Thanks again, and sorry to bother you so much. I do hope some of my results/experiments are a bit useful for you.

Martin

d-markey commented 2 years ago

Hey Martin,

Thanks for the detailed feedback and for not stopping investigations when there's a bump on the road :-) I couldn't figure it out either, Squadron is pretty much doing nothing while workers are busy so that completely puzzled me! The only thing that could explain a freeze would be if your start as many workers as you have vCPUs on your machine. If you have N hardware threads you should probably not provision more than N-1 workers since your workload is synchronous. They would use 100% CPU time and leave no time available for UI.

Now the implementation of the parser has a single yield after the loop, so the stream behaves pretty much like a future. Have you tried moving the yield inside the loop and returning partial lists when hitting a timestamp? The UI would populate the final list progressively hopefully going unnoticed if you can keep items to a few 100s or 1000s.

Also I believe there might be improvements available in terms of serialization. Maybe the overhead of going to string JSON could be avoided?

Thanks for the articles too, nice material! I'll have a closer look at the OAuth one.

martin-robert-fink commented 2 years ago

Hi - I tend not to give up easy :). I used to be CTO of HP, run the C/C++ compiler teams and the Java JVM team, plus 3 operating systems (HP-UX, OpenVMS, and Nonstop Kernel). Trust me, I've had way worse bumps than this :)....

Yes, I did do the single yield when I was testing chunk to determine number of workers. I started with yielding individual values, but that was wayyyyyyy too slow. I did play around with yielding 10,000, 100,000 or more. That tends to cause cause UI janks at every deserialization step. So, I think my best solution will be to just launch one Future (instead of a stream) with one worker. The other thing was that having many workers did improve overall parse time but by only a very small percentage.

So, I think I'm good for now. Thanks again, Martin

martin-robert-fink commented 2 years ago

FYI... ISAR issue

d-markey commented 2 years ago

Hello,

thanks for mentionning Squadron ;-)

FYI I'm testing how Squadron behaves between Web/VM and debug/release. The test is based on a VCD-wanabee parser --> https://github.com/d-markey/squadron_sample/tree/main/lib/src/parser The results are a bit surprising, see log files --> https://github.com/d-markey/squadron_sample/tree/main/lib/src/parser/logs Still working on it... a bit...

I think the parser example can be a good example of how to avoid too much de/serialization (on the worker side, it's only using List and Map with strings and nums). Also I avoided the StreamGroup as it interweaves streams thus producing a final list with items out of order.

martin-robert-fink commented 2 years ago

Hi - Sorry for the delay. Granddaughter's 1st birthday this time 😄. I was in Illinois for that for a few days!

So, it seems interesting that (if I read this correctly), you're getting better performance in debug mode than in release mode. Also, seems that Firefox should be avoided 😄. Are you able to test UI Jank? That was my biggest item. Once I got the overall parse time for a 100MB VCD file down below the 30 seconds mark, performance was less of a consideration if it was happening in the background. In my case, the UI would freeze in debug mode and be more responsive in release mode (I can't remember for sure now, but I think that was native... can't remember web'.

One thing I'd love to try at some point is to re-write the parser in Rust and compile to web assembly. Then, use Dart FFI to run the parser. The parser itself in Rust should be fairly trivial, I have no idea how hard it would be to do the Dart FFI and WebAssembly part.

For the time being I've moved on to the next part of the app (building a hierarchical list of scopes - a tree view). I can't really find a plug-in I like for Tree view hierarchies, so I have to do my own 😄. The benefit of being retired, and having nothing but time!!

Martin

d-markey commented 2 years ago

HI Martin,

your use case is quite interesting and I've ran more tests to better understand the performance issues. I'll write in more details later, but here are a couple take-aways:

 while (lines.isNotEmpty) {
      if (token != null && token.cancelled) return;
      // consume lines so they can be garbage collected (hopefully)
      line = lines.removeAt(0);

The removal of the first line caused too many memory operations behind the scene. I eventually removed that and it was way faster after I changed the while to a classic for.

d-markey commented 2 years ago

Hi Martin,

since a picture is worth a thousand words, here are a few diagrams explaining my findings. I also found that the overhead of communicating between workers is in the range of a few milliseconds (or more, depending on payload size and structure). I don't know what the native runtime does behind the scene and how it does it (Dart's team has also reported improvements on Isolates lately, see https://medium.com/dartlang/dart-2-15-7e7a598e508a) but in Web Workers, sending messages between workers involve cloning AND converting to native JS/browser objects (see implementation at https://api.dart.dev/stable/2.16.2/dart-html/Worker/postMessage.html). So browser scenario comes with this big disadvantage.

Here are the diagrams. At the end of the day, it looks like a producer/consumer rate problem + impact of sending messages. Parallel streaming won't help for responsiveness in your case!

streaming-with-squadron.pdf

martin-robert-fink commented 2 years ago

Wow! That's an impressive analysis. Thanks for doing that.

It looks like my use case (and those similar) that result large amounts of data from an isolate/web-worker is just non-optimal for the Dart threading/isolate model. The Dart isolate/web-worker model is great when you need to do heavy computation on an alternate thread, but only return a single (or small) value.

Hopefully this analysis will help others over time with Squadron and thread/isolates/web-workers in general.

d-markey commented 2 years ago

You're welcome :-) I was actually hoping to find a solution so you'd keep Squadron, but alas! It just can't work around Dart and JavaScript's threading model. But I learnt a whole lot, I just had to share that knowledge. I hope it will be useful to several.

I've tried several solutions (see the "parser" folders in https://github.com/d-markey/squadron_sample) and at the end of the day, if that's just for responsiveness, you can go with something like this:

  Future<List<SignalValue>> parse(List<String> lines, [CancellationToken? token]) async {
    final sw = Stopwatch()..start();

    String line = lines[0];
    if (!line.startsWith(_timeStampMarker)) throw Exception('Invalid data');
    int timeStamp = int.parse(line.substring(_timeStampMarker.length));

    var signalValues = <SignalValue>[];
    for (var i = 1; i < lines.length; i++) {
      final cancelledException = token?.exception;
      if (cancelledException != null) throw cancelledException;

      line = lines[i];
      if (line.startsWith(_timeStampMarker)) {
        // new timestamp
        timeStamp = int.parse(line.substring(_timeStampMarker.length));
      } else {
        // new value change
        final data = line.split(' ');
        signalValues.add(SignalValue(timeStamp, data[1], num.parse(data[0])));
      }

      if (sw.elapsedMilliseconds > maxDelayInMs) {
        // introducing this future will suspend parsing; pending UI events will be processed; eventually, parsing will resume
        await Future.delayed(Duration.zero);
        sw.reset();
      }
    }

    Squadron.info('[${sw.elapsed}] parsed ${lines.length} and extracted ${signalValues.length} signal values');
    return signalValues;
  }

It will keep your app responsive via the maxDelayInMs parameter. E.g. with maxDelayInMs = 100, the UI should respond to user events within 100 ms. The only downside is that parsing will periodically get suspended (even when no UI event has arrived), but it shouldn't be too harmfull. It may impact animations too, so you might want to have an even smaller delay, eg. 50 or 40 ms.

And you can still wrap it in a compute() call so it's actually running on its own thread in native platforms (not disturbed by UI events), and still UI-friendly on Web platforms!

So yes, perfect example where Squadron just can't help, really!

martin-robert-fink commented 2 years ago

Hi David. Not sure if you got notified. I sent a note to the Dart team to highlight the issue we worked on. If you take a look at the link, you'll see the issue logged. The Dart team has Ben great looking through this.

d-markey commented 2 years ago

Hello Martin, I wasn't notified indeed, thanks for the info :-)