d-markey / squadron_builder

Dart code generator for Squadron workers. Implement your worker service and let squadron_builder bridge the gap with Web Workers and Isolates!
https://pub.dev/packages/squadron_builder
MIT License
15 stars 3 forks source link

Is there a way to interrupt a ValueTask? #11

Closed SaadArdati closed 10 months ago

SaadArdati commented 10 months ago

I'm trying to implement a heavy json search that utilizes the Fuzzy package to do a string search on a large json object (~90mb). When typing a search query, I could normally debounce the input, but even then, a singular search task takes seconds to resolve.

I understand that ValueTasks or non-stream-based tasks are not effectively cancellable, is this correct? Is there a way around this for my use case?

  @SquadronMethod()
  Future<List<CityRecord>> search(String query) async {
    print('searching for $query');

    final List<Result<CityRecord>> matches = fuzzy.search(query);

    print('found ${matches.length} matches for $query');

    final results = matches.map((e) => e.item).toList();
    return results.sublist(0, min(30, results.length));
  }
  Future<List<CityRecord>> search(String query) async {
    Future(() => null); // Temp for testing
    if(lastSearchTask != null && lastSearchTask!.isRunning == true) {
      lastSearchTask!.cancel('Cancelled by new search');
    }
    Future(() => null); // Temp for testing.
    lastSearchTask = pool.scheduleTask((worker) => worker.search(query));
    return lastSearchTask!.value;
  }

Here's a log output:

flutter: [eebade77-3a23-40c3-81c5-01d7c279d11b] Searching for v
flutter: searching for v
flutter: [a955b44b-6a51-42bd-8f32-c931dd2597fb] Searching for vi
[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: [-3,"Cancelled by new search","#0      new WorkerException (package:squadron/src/exceptions/worker_exception.dart:17:34)\n#1      new CancelledException (package:squadron/src/exceptions/worker_exception.dart:71:9)\n#2      WorkerTask.cancel (package:squadron/src/_impl/xplat/_worker_task.dart:50:29)\n#3      WorkerValueTask.cancel (package:squadron/src/_impl/xplat/_worker_value_task.dart:32:11)\n#4      TimeZoneManager.search (package:hyper_zones/services/time_zone_manager.dart:104:23)\n#5      _TimeZoneSelectionDialogState.onSearchChanged (package:hyper_zones/ui/time_zone_selection_dropdown.dart:163:25)\n#6      _TimeZoneSelectionDialogState.search (package:hyper_zones/ui/time_zone_selection_dropdown.dart:202:5)\n#7      new TextFormField.<anonymous closure>.onChangedHandler (package:flutter/src/material/text_form_field.dart:196:25)\n#8      EditableTextState._formatAndSetValue (package:flutter/src/widgets/editable_text.dart:3834:27)\n#9      EditableTextState.updateEditingValue (package:flutter/src/widgets/editable_text.dart:3036:7)\n#10     TextInput._updateEditingValue (package:flutter/src/services/text_input.dart:2025:43)\n#11     TextInput._handleTextInputInvocation (package:flutter/src/services/text_input.dart:1858:29)\n#12     TextInput._loudlyHandleTextInputInvocation (package:flutter/src/services/text_input.dart:1759:20)\n#13     MethodChannel._handleAsMethodCall (package:flutter/src/services/platform_channel.dart:559:55)\n#14     MethodChannel.setMethodCallHandler.<anonymous closure> (package:flutter/src/services/platform_channel.dart:552:34)\n#15     _DefaultBinaryMessenger.setMessageHandler.<anonymous closure> (package:flutter/src/services/binding.dart:567:35)\n#16     _invoke2 (dart:ui/hooks.dart:344:13)\n#17     _ChannelCallbackRecord.invoke (dart:ui/channel_buffers.dart:45:5)\n#18     _Channel.push (dart:ui/channel_buffers.dart:135:31)\n#19     ChannelBuffers.push (dart:ui/channel_buffers.dart:343:17)\n#20     PlatformDispatcher._dispatchPlatformMessage (dart:ui/platform_dispatcher.dart:722:22)\n#21     _dispatchPlatformMessage (dart:ui/hooks.dart:257:31)\n",null,null]
#0      new WorkerException (package:squadron/src/exceptions/worker_exception.dart:17:34)
#1      new CancelledException (package:squadron/src/exceptions/worker_exception.dart:71:9)
#2      WorkerTask.cancel (package:squadron/src/_impl/xplat/_worker_task.dart:50:29)
#3      WorkerValueTask.cancel (package:squadron/src/_impl/xplat/_worker_value_task.dart:32:11)
#4      TimeZoneManager.search (package:hyper_zones/services/time_zone_manager.dart:104:23)
#5      _TimeZoneSelectionDialogState.onSearchChanged (package:hyper_zones/ui/time_zone_selection_dropdown.dart:163:25)
#6      _TimeZoneSelectionDialogState.search (package:hyper_zones/ui/time_zone_selection_dropdown.dart:202:5)
#7      new TextFormField.<anonymous closure>.onChangedHandler (package:flutter/src/material/text_form_field.dart:196:25)
#8      EditableTextState._formatAndSetValue (package:flutter/src/widgets/editable_text.dart:3834:27)
#9      EditableTextState.updateEditingValue (package:flutter/src/widgets/editable_text.dart:3036:7)
#10     TextInput._updateEditingValue (package:flutter/src/services/text_input.dart:2025:43)
#11     TextInput._handleTextInputInvocation (package:flutter/src/services/text_input.dart:1858:29)
#12     TextInput._loudlyHandleTextInputInvocation (package:flutter/src/services/text_input.dart:1759:20)
#13     MethodChannel._handleAsMethodCall (package:flutter/src/services/platform_channel.dart:559:55)
#14     MethodChannel.setMethodCallHandler.<anonymous closure> (package:flutter/src/services/platform_channel.dart:552:34)
#15     _DefaultBinaryMessenger.setMessageHandler.<anonymous closure> (package:flutter/src/services/binding.dart:567:35)
#16     _invoke2 (dart:ui/hooks.dart:344:13)
#17     _ChannelCallbackRecord.invoke (dart:ui/channel_buffers.dart:45:5)
#18     _Channel.push (dart:ui/channel_buffers.dart:135:31)
#19     ChannelBuffers.push (dart:ui/channel_buffers.dart:343:17)
#20     PlatformDispatcher._dispatchPlatformMessage (dart:ui/platform_dispatcher.dart:722:22)
#21     _dispatchPlatformMessage (dart:ui/hooks.dart:257:31)

flutter: [d2dee84b-6ccd-4f3a-b88d-95e14f922c8d] Searching for vir
flutter: [e5c92db5-37b4-40d1-b6d4-236d0b98dd88] Searching for virg
flutter: [9233f14f-c1d3-4679-a8d4-62e0df6a51ce] Searching for virgi
flutter: [e3f67dab-45a7-4562-bf28-ffd0ef0c57e7] Searching for virgin
flutter: [16bec2a6-bfd1-4edc-a8fa-91cd2ced4167] Searching for virgini
flutter: [d699c8fb-dab7-4352-9126-8e62a124ae31] Searching for virginia
flutter: found 33307 matches for v
flutter: searching for vi
flutter: found 136816 matches for vi
flutter: [a955b44b-6a51-42bd-8f32-c931dd2597fb] Found 30 matches
flutter: searching for vir
flutter: found 21341 matches for vir
flutter: [d2dee84b-6ccd-4f3a-b88d-95e14f922c8d] Found 30 matches
flutter: searching for virg
flutter: found 29399 matches for virg
flutter: [e5c92db5-37b4-40d1-b6d4-236d0b98dd88] Found 30 matches
flutter: searching for virgi
flutter: found 27681 matches for virgi
flutter: [9233f14f-c1d3-4679-a8d4-62e0df6a51ce] Found 30 matches
flutter: searching for virgin
flutter: found 21904 matches for virgin
flutter: [e3f67dab-45a7-4562-bf28-ffd0ef0c57e7] Found 30 matches
flutter: searching for virgini

Each search operation is taking ~3-5 seconds each. This is of course a worse-case scenario where the search query is not debounced at all, but this is for testing purposes.

The cancelling doesn't work like I expect it to so I'm curious if there is a solution I'm not aware of.

d-markey commented 10 months ago

Hello,

Unfortunately ValueTasks cannot be truely cancelled. Cancelling a ValueTask will throw a CancelledException in the calling Isolate, but the worker Isolate will continue executing the task until it completes. This is by design because of Dart's execution model: once a synchronous block of code has been scheduled in the Isolate's event loop and starts running, it will run uninterrupted until it completes. Note that if the ValueTask is still pending in Squadron's internal queue, it will be cancelled straight away and will never be handed over to the worker Isolate.

A few tips about effectively cancelling a synchronous task is to make it asynchronous and make sure the service method accepts a CancellationToken argument. The service method must periodically check the CancellationToken.cancelled property and if true, stop processing. The calling Isolate will still receive a CancelledException so your code should take action and handle the case when a task is cancelled.

Eg. if the (uncancellable) synchronous task looks like this:

  @squadronMethod
  FutureOr<int> computeSync(Uint8List input) {
    var value = 0;
    for (var i = 0; i < bigNumber; i++) {
      // update value
    }
    return value;
  }

It can be turned into a cancellable task like so:

  @squadronMethod
  Future<int> computeSync(Uint8List input, { CancellationToken? cancelToken }) async {
    var value = 0;
    for (var i = 0; i < bigNumber; i++) {
      if (cancelToken != null && i % 500 == 0) {
          await Future.delayed(Duration.zero); // necessary to ensure the cancelToken state is updated
          if (cancelToken.cancelled) {
            return -1;
          }
      }
      // update value
    }
    return value;
  }

Now in your specific context, I believe this is not feasible because fuzzy.search() is synchronous and uncancellable. Debouncing will certainly help by avoiding submitting unnecessary tasks to the worker. I think you should also use the limit argument to avoid full scans. It will also make the call to sublist useless so you can directly return the results.

SaadArdati commented 9 months ago

This is excellent advise, thank you!