amibar / SmartThreadPool

A .NET Thread Pool fully implemented in C# with many features
http://www.codeproject.com/Articles/7933/Smart-Thread-Pool
Microsoft Public License
507 stars 182 forks source link

New dotnet libraries and mechanisms #24

Open yoav-melamed opened 4 years ago

yoav-melamed commented 4 years ago

Hello Ami, A chance to thank you for your great job! I'm using STP a lot and this is defiantly my favorite library!

Do you consider for future releases some of the new dotnet libraries and mechanisms?

Thanks!

amibar commented 4 years ago

Hi Yoav,

Can you please give examples of how you wish to use these features with STP?

Regards, Ami

yoav-melamed commented 4 years ago

Hi Ami, Just thought maybe it will be a nice improvement to replace some of the blocking code and having some async methods (such as 'await enqueue' or 'await WaitForIdle')

The idea to combine it with ValueTask is just for having a better allocation than just using a regular Task.

Regarding the 'System.Threading.Channels', one thing I like is the 'bounded queue' backpressure mechanism that automatically waits the queue will have a room for additional items when the queue is full (instead of throwing a 'queue is full' exception).

Yoav

amibar commented 4 years ago

Hi @yoav-melamed,

By async/await I thought you meant that when you enqueue an async method it will continue to work on the STP.

For example let's take this async method:

public async Task DoIt()
{ 
   // Do first thing
   await Task.Delay(10);
   // Do second thing
}

Today if you enqueue it into to the STP, the work item will complete after "first thing" is done, and "second thing" will be done on the .net Thread Pool (out of the STP threads).

With an async/await feature the STP can do the second thing on the STP and only then will the work item complete.

Is this a required feature? Or you just need the equeue and WaitForIdle to be awaitable ?

Ami

yoav-melamed commented 2 years ago

Hi @amibar Sorry for the super-late response - I missed your comment :/ Yes, having an awaitable task in the queue will defiantly be a big step for STP It will allow each thread to be more efficient and will defiantly help with writing an async code

In addition, since the WaitForIdle is an expensive blocking method, having it async would be great.

MichelZ commented 1 year ago

Oh yeah, async/await feature so that you can do QueueWorkItem(async () => await xxx())

amibar commented 1 year ago

Hi @yoav-melamed & @MichelZ,

I started to work on the async/await a while ago and I am still figuring out how to solve some scenarios. However, why do you want the async/await feature instead of just using it directly ? I just want to know that my work will be useful for you.

Regards, Ami

yoav-melamed commented 1 year ago

Hi @amibar The reasons from my end is to make every thread of STP the most efficient it could be:

That is what I had in mind Thank you for your hard work!

amibar commented 1 year ago

Hi @yoav-melamed,

Thanks.

The threads of the STP suppose to be worker threads, hence they should use the CPU. When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on.

One of the ideas behind STP is to limit concurrency. If, for example, you limited the concurrency to 1 (max threads) and send 10 work items that each one awaits for a file to download. You'll end up with 10 concurrent downloads that executes on the threads of the .net ThreadPool, while the STP thread is idle.

As for WaitForIdleAsync() and GetResultAsync(), it's quite reasonable tom implement.

What do you think?

Regards, Ami

MichelZ commented 1 year ago

I often use it to limit concurrency, yes. But I also need to use other api's in the worker threads sometimes - and they happen to be more and more async. So it's more a matter of convenience (.GetAwaiter(false).GetResult() anyone?)

amibar commented 1 year ago

Did you mean .ConfigureAwait(false) ?

MichelZ commented 1 year ago

Yeah, that one :)

amibar commented 1 year ago

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

MichelZ commented 1 year ago

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Both. We use STP in a few projects

amibar commented 1 year ago

I have dilemas regarding async methods and the WorkItemGroup.

  1. Lets say you enqueue 2 async work items into a WorkItemGroup with concurrency of 1. The 1st starts executing and then awaits, so the thread is released. Should the 2nd work item starts executing or should it wait until the 1st work item completes? (by "complete" I mean the async method returns rather than awaits)

  2. Another case is when several work items are enqued to a WorkItemGroup and after a while they are all in awaiting state. Does it mean that the WorkItemGroup is idle ? Or only when all these work items complete?

yoav-melamed commented 1 year ago

Hi Ami, For you question

Do you use the WorkItemGroups to limit concurrency or use the STP directly ?

Same as @MichelZ , I'm using STP in several projects and it depends on the project I'm working on. Mainly I'm using the STP directly, but in some project, I need to have 4 workers that collect data each from a different source, and add it to a centralized queue to be processed (and each worker have a different priority) so - I'm using the WIG there

When your work item on the STP awaits, it just delegate the work to another arbitrary thread which the STP has no control on

I thought a lot about what you said, and you are right - executing an async task will make the dotnet ThreadPool control the thread life cycle, which is the opposite of what we want. For calling async API's I think it's better to use the old fashion way (and NOT using ConfigureAwait(false) because we do want the results to returned to the caller thread [which is the STP workitem thread]).

A Sample I wrote to demonstrate what I mean:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
    stp.QueueWorkItem(Executor, i);

stp.WaitForIdle(); // would like to have `await stp.WaitForIdleAsync()` here

Console.WriteLine("All Done.");

void Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    // executing some async API
    var task = Task<int>.Run(async () =>
    {
        await Task.Delay(1000);
        return new Random().Next(1, 1000);
    });
    var results = task.Result;

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");
}
amibar commented 1 year ago

Hi @yoav-melamed,

I looked at your sample and I realized that using the QueueWorkItem for a Task is not the best solution.

When this project started (.net 1.1) the Task type didn’t exist yet, so I had to wrap delegates with objects (WorkItem) and return promises (IWorkItemResult). A Task is a promise, so I can harness its methods, instead of treating it like a delegate.

So for a Task I prefer using RunTask, similar to Task.Run(), rewriting your sample it should look like this:

using System;
using System.Threading.Tasks;
using Amib.Threading;

const int NUM_OF_ITEM = 10;

SmartThreadPool stp = new SmartThreadPool();
stp.Concurrency = 1;

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    _ = stp.RunTask(() => Executor(i));
}

// The tasks are converted to work items internally, so you can wait for idle on the stp as usual
await stp.WaitForIdleAsync(); 

Console.WriteLine("All Done.");

// Executor is an async method
async Task<int> Executor(int ItemNumber)
{
    Console.WriteLine($"Executing item number {ItemNumber}...");

    await Task.Delay(1000);

    var results = new Random().Next(1, 1000);

    Console.WriteLine($"Item {ItemNumber} has done with results of {results}.");

    return results;
}

The RunTask also supports cancelletion token and priority.

I can still implement the QueueWorkItem for tasks, but I don't think it necessary.

What’s your opinion?

I pushed a branch named async with what I did so far.

Ami

yoav-melamed commented 1 year ago

Hi @amibar That looks like a great idea! I think that's exactly what @MichelZ and myself are looking for. I'm going to pull the async branch and take it on a tour - this will defiantly help us with all related to async operations.

Does the RunTask() method will execute a task? or queue it on the stp ThreadPool (like the .net Task.Run() is doing behind the scenes?

If so, would you be able to queue tasks also on WokItemGroups? or for now - only on the stp itself?

yoav-melamed commented 1 year ago

Hi @amibar After pulling the async branch and trying it myself, there is an issue I found with how the code is running: The stp.Concurrency = 1; has no effect on the current code. All the items fired simultaneously and not 1 after the other. Changing it to stp.Concurrency = 5; (for example) has no effect (compared to the original code) and if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right? Did you notice this behavior on your side as well?

yoav-melamed commented 1 year ago

if I'm getting it right - the reason is we are not actually queueing the items, but just firing tasks. Am I right? Did you notice this behavior on your side as well?

Ignore this comment... I saw the code:

if (cancellationToken?.IsCancellationRequested ?? false)
    return Task.FromCanceled(cancellationToken.Value);

PreQueueWorkItem();
WorkItem workItem = WorkItemFactory.CreateWorkItem(
    this,
    WIGStartInfo,
    _ =>
    {
         action();
         return null;
    },
    priority);
Enqueue(workItem);

var wir = workItem.GetWorkItemResult();
cancellationToken?.Register(() => wir.Cancel());

return wir.GetResultAsync();

You queue the task, convert it to WorkItem, and return a Promise (as you explain in your previous comment). I am still trying to investigate why it's not working as expected and all the tasks runs in parallel while ignoring the concurrency parameter

amibar commented 1 year ago

Hi @yoav-melamed,

The concurrency is of the threads not the tasks. When a work item awaits, it releases the thread to another work item.

Ami

yoav-melamed commented 1 year ago

Hi @amibar Sorry, maybe I got confused, so what are the use cases you think RunTask will resolve? Because as for now I thought it will be executed on the queue as a work item but in awaitable way - probably my misunderstanding.

amibar commented 1 year ago

Hi @yoav-melamed,

I pushed anothee commit to the async branch.

This time a WorkItemsGroup cannot have more than the Concurrency work items executing.

Please check if it works for you.

Thanks, Ami

yoav-melamed commented 1 year ago

Hi @amibar Thanks for your feedback! I tried to re-run the sample above with the updated async branch, but the results were the same: When the stp.Concurrency = 1 all the tasks were executed simultaneously and returned the results after 1 second (the delay time in the Executor).

amibar commented 1 year ago

@yoav-melamed, check the WorkItemsGroup, I didn't update the STP itself.

yoav-melamed commented 1 year ago

Hi @amibar I made the following changes in my code:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));

    _ = wig.RunTask(() => Executor(i));
}

Now the execution is running as expected (if concurrency is 1, the items are being executed one after the other, if it's 5 (for example) so only 5 in parallel are invoked).

The issue I noticed now is that since the RunTak returns a Task that we are not await to, all the items passed to the Executor() function have the value of 11, and it makes the Executor run with the wrong values. We cannot await to RunTask because it's executing it and not adding it to the queue. Is there any way you can think of to bypass it and make execute with the correct values?

amibar commented 1 year ago

@yoav-melamed,

Check https://stackoverflow.com/questions/271440/captured-variable-in-a-loop-in-c-sharp to solve this.

You should copy i to another local variable before passing it to the Executor Something like:

//stp.Concurrency = 1;
var wig = stp.CreateWorkItemsGroup(concurrency: 5);

for (int i = 1; i <= NUM_OF_ITEM; i++)
{
    // Run the Executor as Task on stp
    // Returns a Task<int> instead of IWorkItemResult<int>
    //_ = stp.RunTask(() => Executor(i));
   var i2 = i;
    _ = wig.RunTask(() => Executor(i2));
}
yoav-melamed commented 1 year ago

Hi @amibar That's working great now - thank you! Is there a more elegant way to handle the 'copy to another variable' functionality? or that's just something we will have to do for it to work?