LostBeard / SpawnDev.BlazorJS

Full Blazor WebAssembly and Javascript Interop with multithreading via WebWorkers
https://blazorjs.spawndev.com
MIT License
78 stars 6 forks source link

Question: How to cancel WebWorkerService.TaskPool.Run()? #30

Closed softhorizons closed 1 month ago

softhorizons commented 1 month ago

I'm doing this and need to cancel it sometimes:

await WebWorkerService.TaskPool.Run(() => AssembleSync(request));

AssembleSync is:

public byte[]? AssembleSync(CorePDFAssembleRequest request)
{
    // ...
}

Can that be cancelled when using the oh-so-convenient TaskPool-style interface?

Alternately I tried

Func<bool> checkCancelled = () => doTheCheck();
await WebWorkerService.TaskPool.Run(() => AssembleSync1(request, checkCancelled));

AssembleSync1 is:

public byte[]? AssembleSync1(CorePDFAssembleRequest request, Func<bool> checkCancelled)
{
    // ...
}

but unfortunately I get a runtime exception

blazor.webassembly.js:1 Uncaught (in promise) Error: System.Exception: Object of type 'System.Action1[System.Boolean]' cannot be converted to type 'System.Func1[System.Boolean]'.`

Another alternative would be to send the WebWorker a cancel message. Is that possible usingTaskPool-style?

Any wisdom is appreciated.

LostBeard commented 1 month ago

Currently Action is supported but not Func. Func support will likely be added at some point soon. I will at least fix the error message to better indicate the actual issue.

I started working on a way to allow sending a CancellationToken last night. Getting that working is at the top of my list for this morning. I will add more here when it is done.

LostBeard commented 1 month ago

I just uploaded version 2.2.88 of WebWorkers. I added support for CancellationToken parameters when calling methods in WebWorkers. I also added a more informative exception message when trying to pass Func as a parameter to WebWorkers indicating it is not currently supported.

CancellationToken WebWorker examples

This are not really useful as is, but demonstrate a task being start on a WebWorker and it will be cancelled after 5 seconds or if CanceIt() is called.

Here is some code inline with your example:

void StartIt()
{
    cancellationTokenSource = new CancellationTokenSource(5000);
    try
    {
            bytes = await WebWorkerService.TaskPool.Run(() => AssembleSync2(request, cancellationTokenSource.Token));
    } 
    catch
    {
        // cancelled
        bytes = bytesFallback;
    }
}
CancellationTokenSource? cancellationTokenSource = null;
byte[]? bytes = null;
byte[] bytesFallback = new byte[]{ 0, 1, 2, 3 };
void CancelIt()
{
    cancellationTokenSource?.Cancel();
}

AssembleSync2 that will cancel with an exception if cancelled:

public byte[]? AssembleSync2(CorePDFAssembleRequest request, CancellationToken token)
{
    var ret = new List<byte>();
    while(true)
    {
        // do some work
        ret.Add(0);
        // throw an exception if cancelled
        token.ThrowIfCancellationRequested();
    }
    return ret;
}

Another example that doesn't throw and instead returns a null as the return value instead of a byte[] when cancelled

void StartIt()
{
    cancellationTokenSource = new CancellationTokenSource(5000);
    bytes = await WebWorkerService.TaskPool.Run(() => AssembleSync3(request, cancellationTokenSource.Token));
    if (bytes == null) bytes = bytesFallback;
}
CancellationTokenSource? cancellationTokenSource = null;
byte[]? bytes = null;
byte[] bytesFallback = new byte[]{ 0, 1, 2, 3 };
void CancelIt()
{
    cancellationTokenSource?.Cancel();
}

AssembleSync3 that will cancel with a set return value if cancelled:

public byte[]? AssembleSync3(CorePDFAssembleRequest request, CancellationToken token)
{
    var ret = new List<byte>();
    while(true)
    {
        // do some work
        ret.Add(0);
        if (token.IsCancellationRequested)
        {
            return null;
        }
    }
    return ret;
}
LostBeard commented 1 month ago

I am closing this issue as completed. You may continue to post if needed.

LostBeard commented 1 month ago

The below example is another way for a WebWorker to check if it should keep working and it falls more in line with what you attempted.

The method called on the WebWorker uses WebWorkerService.WindowTask to read the CancelWorkers static property on the Window scope to determine if it should keep running. WebWorkerService.WindowTask is similar to WebWorkerService.TaskPool except all calls to WebWorkerService.WindowTask will run on the Window scope if called from the Window scope or the Worker scope.

This is code from a unit test I added to verify it works as expected.

    // Workers read static property of this class from the Window instance that created them to determine if it should cancel running
    [TestMethod]
    public async Task TaskPoolExpressionStaticPropertyReadToCancelTest()
    {
        if (!WebWorkerService.WebWorkerSupported)
        {
            throw new Exception("Worker not supported by browser. Expected failure.");
        }
        CancelWorkers = false;
        // CancelWorkers will be set to true on the window scope after 2000 seconds
        // the WebWorker task will read this property every so often to check if it should continue running
        using var cts = new CancellationTokenSource(2000);
        cts.Token.Register(() => CancelWorkers = true);
        var cancelled = await WebWorkerService.TaskPool.Run(() => ThisMethodWillGetCalledInAWorker(null!, 10000));
        if (!cancelled)
        {
            throw new Exception("Cancel failed");
        }
    }
    // This property will be set on the Window scope and read by the WebWorker scope
    static bool CancelWorkers { get; set; } = false;

    // Method returns true if it was cancelled, false if it ran until completion
   // This method will run in a DedicatedWebWorkerScope
    private static async Task<bool> ThisMethodWillGetCalledInAWorker([FromServices] WebWorkerService webWorkerService, double maxRunTimeMS)
    {
        var startTime = DateTime.Now;
        var maxRunTime = TimeSpan.FromMilliseconds(maxRunTimeMS);
        while (DateTime.Now - startTime < maxRunTime)
        {
            Console.WriteLine("Waiting");
            await Task.Delay(50);
            // the below line reads the static CancelWorkers property from the Window scope
            // a method on the Window scope could be called instead of reading a property
            var cancelled = await webWorkerService.WindowTask!.Run(() => CancelWorkers);
            if (cancelled) return true;
        }
        return false;
    }

This method requires calling from the WebWorker into the Window scope frequently to check if it is cancelled. Using a CancellationToken only requires a single additional call, from the Window scope into the Worker scope if cancelled.

softhorizons commented 1 month ago

So, here's my stripped-down test code:

public void AssembleCancellationTest()
{
           Console.WriteLine("Starting test");
           var retd = -2;
           var loops = 0;
           using (CancellationTokenSource cts = new CancellationTokenSource())
           {
               CancellationToken token = cts.Token;
               var runningTask = WebWorkerService.TaskPool.Run(() => AssembleSyncXXX(token));

               while (true)
               {
                   ++loops;
                   try
                   {
                       retd = await runningTask.WaitAsync(TimeSpan.FromMilliseconds(10));
                       break;
                   }
                   catch (TimeoutException) { }
                   cts.Cancel();
               }
           }
           Console.WriteLine($"Returned {retd}, loops: {loops}");
       }

       public int AssembleSyncXXX(CancellationToken token)
       {
           for (var i = 0; i < 100000000; i++)
               if (token.IsCancellationRequested) return i;
           return -42;
       }

Results:

Starting test
blazor.webassembly.js:1 Returned -42, loops: 1087

Looks like AssembleSyncXX does run, but token.IsCancellationRequested never becomes true. Thoughts?

LostBeard commented 1 month ago

I should have mentioned this...

Messages, such as the token cancellation message from the window, are received using message event handlers. Your AssembleSyncXXX method never releases the thread to allow the message event to fire.

public async Task<int> AssembleSyncXXX(CancellationToken token)
{
   for (var i = 0; i < 100000000; i++)
   {
       // The below line will give the message event handlers a chance to process incoming messages
       await Task.Delay(1); 
       if (token.IsCancellationRequested) return i;         
   }
   return -42;
}

Task.Delay(1) would only need to be called before checking the token to allow firing any new message events.

Offtopic:
-42 Any chance your number choice is related to the answer to the ultimate question?

softhorizons commented 1 month ago

I was afraid of that, was trying to avoid async. Naturally my example is greatly simplified. IRL it needs to be along the lines of

public byte[] AssembleSyncXXX(token)
{
  return aBigSynchronousPieceOfSoftware(() => token.IsCancelled); //Func is called back from deep inside
}

Converting that big synchronous piece of software to async procedures everywhere is prohibitive, probably not even possible since there are closed-source libraries involved. Unless you have a better idea, I suppose I'll have to explicitly create a new WebWorker for each call of AssembleSync so that I have a reference that I can Dispose() to kill it.

Off-topic: 42 is everything and nothing! I come from the page-layout software world where it keeps popping up, along with references to Vogon poetry and Milliways. Ever hear of Type42 PostScript fonts?

LostBeard commented 1 month ago

Converting that big synchronous piece of software to async procedures everywhere is prohibitive, probably not even possible since there are closed-source libraries involved.

How would you have cancelled operations in those closed source libraries in a console app (for instance)?

LostBeard commented 1 month ago

I do believe it is possible to use a SharedArrayBuffer for synchronous reading and writing of a canceled flag but it would require running the code in a secure context with cross-origin isolation due to SharedArrayBuffer restrictions.

I am almost ready to test my implementation of the above method. I created 2 new classes to handle this, SharedCancellationToken and SharedCancellationTokenSource. They act nearly identical to CancellationToken and CancellationTokenSource but they use a SharedArrayBuffer for signaling instead of postMessage which removes the need for an asynchronous check.

SharedArrayBuffer is the only way I have come up with that allows synchronous writing and reading of data that can be shared between threads (web workers, window, etc.)

softhorizons commented 1 month ago
public static void Main(string[] args)
{
  //Strictly optional delegate  
  Console.CancelKeyPress += delegate {
        // call methods to clean up
    };
    LongRunning(); //user can hit ^C if they lose patience
}

Or if I need a graceful exit:

class MainClass
{
    private static bool keepRunning = true;

    public static void Main(string[] args)
    {
        Console.CancelKeyPress += delegate(object? sender, ConsoleCancelEventArgs e) {
            e.Cancel = true; //e.g. user hit ^C but we want to terminate gracefully
            MainClass.keepRunning = false;
        };

        LongRunning(() => !keepRunning); //LongRunning() periodically calls delegate
        Console.WriteLine("exited gracefully");
    }
}

I realize that the cooperative-tasking async model has its limitations. In this particular case I'd be happy with the first solution: just let the "supervisor" terminate the process with immediate effect.

softhorizons commented 1 month ago

SharedBufferArray may well solve my immediate problem, but the added costs of origin & security issues would be too high for this project. Referring back to [(https://github.com/LostBeard/SpawnDev.BlazorJS/issues/2 )] there's another form of invocation & cancellation:

    _webWorker ??= await WebWorkerService.GetWebWorker();
    var runner = _webWorker.GetService<ITestWorker>();
    var result = runner.RunTests(code); //not doing await yet
    .....time goes by and RunTests() hasn't stopped yet, so blow away the worker:
    _webWorker?.Dispose();
    _webWorker = null;

That would work for me. Not ideal, but plenty good enough. Is there a way to get at the equivalent of _webWorker, or at least force _webWorker.Dispose(), using TaskPool-style invocation?

LostBeard commented 1 month ago

A problem with that method is the startup time for new WebWorkers, but everything has tradeoffs.

Is there a way to get at the equivalent of _webWorker, or at least force _webWorker.Dispose(), using TaskPool-style invocation?

I am sorry. I am not sure I understand what you are asking here. WebWorker supports the same method invocation styles as TaskPool and WindowTask.

Example using Run() expression invoker like we have been using with TaskPool:

var runningTask = _webWorker.Run(() => AssembleSyncXXX(token));

SharedBufferArray may well solve my immediate problem, but the added costs of origin & security issues would be too high for this project.

I did come up with an alternative to SharedArrayBuffer. WebWorkers can use FileSystemSyncAccessHandle to synchronously access a FileSystemFileHandle. That gives WebWorkers another synchronous way to read a cancellation flag that can be set by another thread and this one doesn't require cross-origin isolation.

LostBeard commented 1 month ago

Version 2.2.91 now supports the below code which uses the new SharedCancellationToken.

This requires cross-origin isolation. I have not had a chance to work on the FileSystemSyncAccessHandle idea yet.

public async Task WebWorkerSharedCancellationTokenSourceTest()
{
    if (!WebWorkerService.WebWorkerSupported)
    {
        throw new NotSupportedException("Worker not supported by browser.");
    }
    if (!JS.CrossOriginIsolated)
    {
        throw new NotSupportedException("Not CrossOriginIsolated.");
    }
    var retd = -2;
    var loops = 0;
    using (var cts = new SharedCancellationTokenSource())
    {
        var token = cts.Token;
        var runningTask = WebWorkerService.TaskPool.Run(() => AssembleSyncXXX(token));
        while (true)
        {
            ++loops;
            try
            {
                retd = await runningTask.WaitAsync(TimeSpan.FromMilliseconds(10));
                break;
            }
            catch (TimeoutException) { }
            cts.Cancel();
        }
    }
    Console.WriteLine($"main retd: {retd}, loops: {loops}");
}
static int AssembleSyncXXX(SharedCancellationToken token)
{
    for (var i = 0; i < 100000000; i++)
    {
        if (token.IsCancellationRequested) return i;
    }
    return -42;
}
softhorizons commented 1 month ago

YESSSS!

WebWorker supports the same method invocation styles as TaskPool and WindowTask.

That is what I was missing. The docs & readme led me to believe you can only use webWorker.GetService() on a webWorker. I didn't see any example or reference to webWorker.Run() or Call(). Now the whole thing makes perfect sense.

Knowing this, I've implemented a very simple scheme for my own service which serially reuses a WebWorker (to save startup overhead) unless it was previously Disposed to cancel it, in which case it allocates a new WebWorker. So I'm good! It's now trivial for me to convert any sync code to use WebWorkers while preserving the ability to supervise & cancel it.

Theoretically I could write some code to explicitly allocate a webWorker from a TaskPool, run it under some kind of supervisory code that does Dispose() to kill it. The problem is that I must eventually Release it back to the TaskPool, and the next user of the TaskPool will be rather unhappy to get a pre-Disposed webWorker. It would be useful if TaskPool's Release function had an optional argument that makes it Dispose the released WW and remove it from the pool. That would be useful for forced cancellation, for a WW that got an unexpected Exception that leads a cautious programmer to wonder if maybe the WW is somehow corrupted, and also to get rid of a WW that's too much trouble to clean up, e.g. you've set up a bunch globals, maybe event handlers.

LostBeard commented 1 month ago

The WebWorkerSharedCancellationTokenSourceTest test on the demo UnitTests page implements the code I just posted for SharedCancaellationToken.

LostBeard commented 1 month ago

The docs & readme led me to believe you can only use webWorker.GetService() on a webWorker.

Ya. Sorry about that. The docs get less attention from me than the code does. I really need to add more, but the README.md is getting bloated so I started a project just for the docs... which needs updating and filling out.

softhorizons commented 1 month ago

Please don't worry about README bloat. I've seen huge READMEs on npm and was glad to be able to find the details I needed. I'd rather see more than less, and can always stop reading whenever I choose.

LostBeard commented 1 month ago

The GetService() method (interface DispatchProxy) was the first method I added for invoking on the WebWorker. The expression invoker (Run()) and delegate invoker (Invoke()) were added more recently.

softhorizons commented 1 month ago

The expression invoker (Run()) and delegate invoker (Invoke()) were added more recently.

Glad you did! It's a fabulous feature.

LostBeard commented 1 month ago

It would be useful if TaskPool's Release function had an optional argument that makes it Dispose the released WW and remove it from the pool.

Disposing a WebWorkerPool WebWorker now (code in place, Nuget's not updated yet) removes it from the pool and another spins up in its place to replace it.

In case it is not clear.

WebWorkerPool is an optional service.

The below 2 methods both return WebWorkers but the differ in how they get them.

Async method WebWorkerService.GetWebWorker() - Creates a new a WebWorker instance and returns it when it when it is ready for use.

Async method WebWorkerPool.GetWorkerAsync() - Waits until a free WebWorker is available in the web worker pool and returns it. If one is not available right now, and the number of WebWorkers running is less than the max, another worker is started and will be returned.

softhorizons commented 1 month ago

Excellent! Yes, I understand the 2 cases.

One clarification: with that upcoming fix, must a Disposed WW gotten from a TaskPool nonetheless be TaskPool.ReleaseWorker'd, or does the TaskPool somehow detect the Dispose and do what's necessary? In the latter case, is it harmful to redundantly TaskPool.ReleaseWorker a Disposed WW?

LostBeard commented 1 month ago

It internally calls Release when it is Disposed. Calling Release on it again will do nothing.