dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.09k stars 2.03k forks source link

#Question Reactive background processing inside grain #8146

Open AndrewP-GH opened 1 year ago

AndrewP-GH commented 1 year ago

Hello!

How to implement background processing inside grain? For example, I have a grain that has an inner queue, and I enqueue items into it.

public class QueueGrain : Grain
{
    private readonly Queue<string> _queue = new();

    public Task Enqueue(string item)
    {
        _queue.Enqueue(item);
        return Task.CompletedTask;
    }

    private async Task Dequeue()
    {
        if (_queue.TryDequeue(out var item))
            await DoWork(item);
        await Task.Yield();
    }

    private Task DoWork(string item)
    {
        // Do some work
        return Task.CompletedTask;
    }
}

I want the method Dequeue to be called reactively (and I can't use standard grain timers cause of big CPU pressure).

I see two possible options:

  1. Use Task.Factory.StartNew

    public override Task OnActivateAsync()
    {
        Task.Factory.StartNew(StartBackgroundProcessing);
        return base.OnActivateAsync();
    }
    
    private async Task StartBackgroundProcessing()
    {
        while (_cts.IsCancellationRequested)
        {
            await Dequeue();
        }
    }

    But this code may interleave some message processing and it won't be atomic, for example

    public async Task Enqueue(string item)
    {
       await Enqueue(item, _cts.Token);
       // interleaved by StartBackgroundProcessing
       await Smt();
    }

    And also it will still be CPU intensive.

The alternative to the pure queue is to use the Channel class, where I can await on Dequeue, but I don't see any difference while I'm using Task.Factory.StartNew(StartBackgroundProcessing) on activating because I still have interleaving problems.

  1. Use thread pool via Task.Run in OnActivateAsync to call itself through AsReference and use Channel.
    
    public interface IHaveDequeue
    {
    Task Dequeue();
    }

public class QueueGrain : Grain, IHaveDequeue { private readonly Channel _queue = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true, SingleWriter = true, AllowSynchronousContinuations = false } ); private readonly CancellationTokenSource _cts = new();

public async Task Enqueue(string item)
{
    await _queue.Writer.WriteAsync(item);
}

public async Task Dequeue()
{
    var item = await _queue.Reader.ReadAsync(_cts.Token);
    await DoWork(item);
}

private Task DoWork(string item)
{
    // Do some work
    return Task.CompletedTask;
}

public override Task OnActivateAsync()
{
    Task.Run(
        async () =>
        {
            while (!_cts.IsCancellationRequested)
            {
                await this.AsReference<IHaveDequeue>().Dequeue();
            }
        }
    );
    return base.OnActivateAsync();
}

}



Is this code correct in terms of no interrupts (all public methods calls are 'atomic') and a proper task scheduler (all actions with grain data take place in grain's activation context without race condition)?
AndrewP-GH commented 1 year ago

I'm afraid that calling via thread pool in the last solution when the queue is empty will lock my grain and another clients will not be able to call this grain.

gfoidl commented 1 year ago

Is that background processing inside a grain the right design decision? What happens when the grain gets deactivated, but there are still items in the queue? Would it be better to do that processing outside of a grain (in a separate service)?

AndrewP-GH commented 1 year ago

Lets I try to describe my problem with more details: I need smth like queue for commands of some entity with good horizontal scaling for this scenario 1) I push commands into this queue and I want these commands to be executed one after another 2) Commands can be executed for a long time 3) I need to add commands very quickly My current solution — I store queue in grain and use timer to schedule command execution from this queue to threadpool, get task and check it's status by timer call.

But now I want to remove timers because them consume a lot of CPU, so new solution in my mind looks like: 1) grain with own queue, on addition command grain enqueue it 2) at this step i want trigger smth, that can get command from this inner queue and schedule execution to another grain (asynchronously but without concurrency and in this point mechanism don't move to the next item in queue and waiting the result 3) I can add another commands into grain's queue by calling this grain 4) 'worker' grain will notify 'queue' grain that command finished 5) the step 2 will continue and dequeue next command from inner queue when there are no active calls to this grain.

As you see, the biggest problem is to implement async dequeue using Orleans task scheduler and I don't have any ideas (except described in my first post) how to do this :(

tomachristian commented 6 months ago

@AndrewP-GH We do the same exact thing and we also wanted that processing each queued message should not interleave with any external calls to the grain.

You can define another method on your grain interface (or a separate internal interface, given that this is an implementation detail), something like Loop/ExecuteNext, it would ideally be marked as [OneWay].

In your enqueue method you can call this Loop method on the grain itself after the item was added to the queue (make sure you do the call via a reference, so that it gets queued as a normal request; you can even optimise to only make this call if this is the first item you add to the queue).

In the Loop method you can dequeue a message from the queue (and if you got one, because this might be an orphaned call from a previous activation) and process it; at the end of Loop you can do another call to Loop as in Schedule there are remaining items in the queue.

Upon deactivation, inside on OnDeactivate override, you can inspect the queue and see if there were any items left unprocessed; there you can decide what to do with them: try to process them in-place by draining the queue, drop them, try to send them one way to a new activation, etc. Mind you this is best-effort during deactivation.

This works brilliantly for us, without hacks.