jamesmh / coravel

Near-zero config .NET library that makes advanced application features like Task Scheduling, Caching, Queuing, Event Broadcasting, and more a breeze!
https://docs.coravel.net/Installation/
MIT License
3.63k stars 243 forks source link

Run Task One Another / Control Concurrency in Queues #364

Open sangeethnandakumar opened 3 months ago

sangeethnandakumar commented 3 months ago

I saw this but still want to know if we can do something about this - https://github.com/jamesmh/coravel/issues/325

Problem Using WhenAll()

I created a WebScrapper which scraps a particular website. Users can log in and click a button to queue the scrapping request. The problem is when 50 uses queues the scrapping job of all 50 starts at the same time.

This causes immense resource problems in my server sweeping our all available RAM. I searched everywhere to see if I can control Coravel to run 5 tasks at a time or 1 or 2. I mean if the dev can control.

I understand from https://github.com/jamesmh/coravel/issues/325 that it's by design we use Task.WaitAll(), but this is a bigger concern than any workarounds.

What if we add a Semaphore to control the number of concurrent tasks that can execute in a queue, Maybe something similar to this

//Here I'm creating a semaphore with a maximum of maybe 3 concurrent tasks (Users can configure this)
SemaphoreSlim semaphore = new SemaphoreSlim(3);

List<Task> tasks = new List<Task>();

for (int i = 0; i < 10; i++)
{
    tasks.Add(Task.Run(async () =>
    {
        await semaphore.WaitAsync(); // Wait for an open slot

        try
        {
            // Our code
            Console.WriteLine($"Task {Task.CurrentId} is running");
            await Task.Delay(1000); // Simulate work with a delay
        }
        finally
        {
            semaphore.Release(); // Release the slot
        }
    }));
}

// Wait for all tasks to complete as earlier
await Task.WhenAll(tasks);

What do you think?

jamesmh commented 2 months ago

Coravel could offer something like that.

The problem with this though is that this would apply to all messages in the queue and would significantly reduce throughput across the board.

So this would depend on having multiple independent queues, and then implement what you've suggested per queue. This would be a bit of work šŸ˜….

One approach that could work, is you have your queue message handler put a "job" item in storage (a database table, document, etc.). Then, using a the scheduler, every X seconds (or whatever), you can purge that database table (etc.) using the solution you suggested.

This would work well because the scheduler has the option to PreventOverlap so that you can more reliably process these items.

For example, the queue doesn't support preventing any overlap which could occur in the example you gave, if implemented inside of the queue message handler (e.g. your first queue handler is doing web scrapping but then the queue triggers a purge/consumation 30 seconds later. The first one isn't done yet...). You could use a static semaphore here to "fix" that.

That would be my suggestion at the moment: queued message complete quickly and don't get blocked/slowed by writing a job object to storage. Have a scheduled job to process those jobs every X seconds and use PreventOverlap to prevent overloading your RAM, CPU, etc.

Have any thoughts on this approach? (It's a common pattern to use for this specific scenario and is exactly why the PreventOverlapping scheduler feature exists šŸ‘).