dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
14.91k stars 4.63k forks source link

Parallel.ForEach ends working in a single thread #46146

Closed Escoto closed 3 years ago

Escoto commented 3 years ago

I had an issue with Parallel.ForEach - Might be how I was using it, but if that's the case it would be really nice if it could be improved so that one don't run into this issue so easily.

Problem - I have a list of a few hundreds of tasks to run in parallel, each task can take from a few seconds up to a day and MaxDegreeOfParallelism has to be configured. While they do start in parallel, It happens that I end up with a bunch of tasks stuck in a single thread.

Ex. Lets say 400 tasks for 4 threads. What I noticed is I get 4 partitions of a 100 tasks each. 3 partitions may finish in a few minutes then I have a single threaded partition with a bunch of tasks waiting for one 2-hours task, and I may have 10 like this one after the other in the same partition. Basically leaving me on a single threaded partition for a large number of tasks while the other partitions already finished resulting on 3 threads being wasted.

I created a barebone gist of my solution. With this idea I went from 2-3 hours (using Parallel.ForEach) to 45 mins for my test data. Not just that, with this idea the order of execution is also predictable.

Clockwork-Muse commented 3 years ago

... at least some of the implementations work-steal, so we'd have to see how you're using it. If "partitioner" is referring to one of the overloads that takes an actual Partitioner object, then that does appear to be the expected behavior (static partitions).

Not just that, with this idea the order of execution is also predictable.

Not really. Tasks will be submitted in the listed order, but there's no guarantee that they're actually processed (or, perhaps more relevant, completed) in that order - you're at the mercy of the scheduler (here, the default thread pool, which is likely to make a best effort). This idea has higher overhead than at least some overloads of Parallel.ForEach, since you submit an actual task for each invocation - at that point you might as well do:

var tasks = actions.Select(Task.Run).ToArray();
Task.WaitAll(tasks);

... note that many of the overloads for Parallel.ForEach already behave this way anyways.

With this idea I went from 2-3 hours (using Parallel.ForEach) to 45 mins for my test data.

... It sounds like your tests are still taking too long.


The bigger question, I think, would be whether your tasks can be executed asynchronously (doing things like downloading files), or whether you're actually CPU-bound. Which might make things faster.

Escoto commented 3 years ago

@Clockwork-Muse Thanks for your answer.

Not really. Tasks will be submitted in the listed order, but there's no guarantee that they're actually processed (or, perhaps more relevant, completed) in that order...

You are right i can't tell the order in which they complete, but this way i can tell the order they start an thus assign priorities. Since I know the entire procedure takes about 48 hrs, having them in priority order i know i can trigger other tasks after 4-7 hrs instead of waiting a complete random amount of time.

... It sounds like your tests are still taking too long.

The test cases don't take more than a few seconds of course but i have a sandbox where i can simulate the execution on a customer's system and use from little to quite some data, running the tool for the entire data set on the sandbox is that takes those long times i mention.

so we'd have to see how you're using it.

So, how i was using it is that for each task i have a config object, and i was using the Parallel.ForEach somewhat along this lines

Parallel.ForEach( configList, options, (config) => 
{
    DoTheThing(config);
    Notify(config);
}); 

The bigger question...

I am CPU bound on the server i am hitting, not so much on my side. Since i am querying client's production systems i have to be able to prevent my application from overloading client's systems. The servers are also constrained by whatever license rules apply like number of simultaneous sessions by user, that's why limiting the amount of parallel tasks is a must.

Clockwork-Muse commented 3 years ago

So, how i was using it is that for each task i have a config object, and i was using the Parallel.ForEach somewhat along this lines

... It appears that work is "chunked" - that a group is handed out as a subset of the range, and not recalculated, which would show the behavior you're seeing. Parallel.Invoke directly uses the index of the array, but it's not completely clear why this wasn't done for ForEach (for List and Array, at least). It does suggest that ForEach is primarily intended for situations where the work is mostly evenly distributed.


I am CPU bound on the server i am hitting, not so much on my side.

So you're async capable. The server being CPU-bound is an implementation detail, and not something you should be concerned with (in an ideal world, at least); all you really care about is that some stuff is being "delayed".

Since i am querying client's production systems i have to be able to prevent my application from overloading client's systems.

Perhaps, but it's not immediately clear that client-side delays are the answer to this (as opposed to something like passing everything to the server, which maintains its own queue). Although there are options submitting all tasks but only having a limited number of them at a time communicate with another resource (Eg via SemaphoreSlim.WaitAsync())

The servers are also constrained by whatever license rules apply like number of simultaneous sessions by user, that's why limiting the amount of parallel tasks is a must.

You're not likely to end up with count number of simultaneous connections. For example, using the same HttpClient for all the submission tasks has a good likelihood of using one persistent connection (because setting everything up can be expensive).

... regardless, you're effectively (if not actually) building a job management system, and Parallel.ForEach is going to be a poor fit (one reason being that you can't manipulate individual jobs after submission).

Escoto commented 3 years ago

So, in theory i am async capable, but i am hitting SAP via RFC... SAP's libraries just lock your application when calling the server, they don't even implement a way to gracefully timeout, that's why i have each Action on a separate Task. In my specific use case, one Action can result on hundreds of calls (all within the same Task), these unitary calls are supposed to be sequential.

I guess you can see it as the line in a bank, where there is a single line for all the cashiers. 1 client may have 1 transaction to do while another one has many and can keep a cashier busy for what feels like an hour.


Parallel.Invoke directly uses the index of the array,

Actually now that you mention this... i didn't try this originally because 1-At the beginning I didnt have a Action[] i just had a MyConfig list, so i decided to use Parallel.ForEach 2-i thought it would behave the same way as Parallel.ForEach.

But after your comment and now that i have an Action[] , I just changed in my code From:

Task.Run(() => actionsEnumerable.RunAsync(threadCount)).Wait();

To:

Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = threadCount }, actionsEnumerable.ToArray());

And it works just as i want!

... regardless, you're effectively (if not actually) building a job management system

Maybe, but that's not the intention. Parallel.Invoke seems to get the job done just fine so i guess i'll just stick to it now.


So, i solved my issue, but i would say it would be an improvement if either

Or

I would be happy to work on the second option if that sounds like a nice to have feature.

Clockwork-Muse commented 3 years ago

So, in theory i am async capable, but i am hitting SAP via RFC... SAP's libraries just lock your application when calling the server, they don't even implement a way to gracefully timeout, that's why i have each Action on a separate Task.

You don't mention what it is you're hitting, but at least some of the SAP documentation mentions various async calls (if you have BeginX/EndX method pairs, you're looking at APM). Otherwise, that's a rather annoying choice on their part.

In my specific use case, one Action can result on hundreds of calls (all within the same Task), these unitary calls are supposed to be sequential.

... this doesn't change in an async task. You'd still prep all your Actions, and they'd still execute their steps in the given order.

I guess you can see it as the line in a bank, where there is a single line for all the cashiers. 1 client may have 1 transaction to do while another one has many and can keep a cashier busy for what feels like an hour.

When you're dealing with something networked, it's more like:

  1. You (Task) have a line for all the cashiers (Thread).
  2. You walk up to the cashier (the task starts running).
  3. After some initial pleasantries (error checking), you communicate your request. The cashier writes a small note (serialization) and puts it in a message tube (transmitted on network).

At this point, one of two things can happen:

There's a downside to the second option: your time at the bank gets (slightly) longer the more messages get sent through the tubes! It's usually not a large amount of time, however, and may not be noticeable from your perspective if the cashiers are handling return messages relatively promptly. There's an enormous upside from the bank's perspective, however: they don't have to hire as many cashiers!

Escoto commented 3 years ago

I am with you on that, and i woul've had the same stand before working with SAP systems, but here is a copy-paste directly from their documentation

There is one common misunderstanding about tRFC, which has to be cleared up here: in the case of an external program talking to an ABAP system, tRFC is not an asynchronous execution, technically it’s fully synchronous...

They do have a version called qRFC but it requires configurations on our cleint's SAP that we are simply not allowed to do, with tRFC we just setup our RFC and we are good to go (RFC - Remote Function Call/ SAP's stored procedures). Because of this, we completely ignored qRFC.

Out of curiosity i read a little about qRFC and also found this

Client and server system must be SAP systems

So, yeah... SAP... pure joy, king of the industry.


Your time at the bank gets (slightly) longer the more messages get sent through the tubes!

Yes, no doubt on that, creating the array is O(n) complexity. But, at least in this use case i have to build up that array if i want Parallel.Invoke to handle(?) the work for me. So, whether I or the framework builds it, I'm still going to pay that time, which is really insignificant against the length of my whole process. If the framework did it for me that would be less code for me to worry about. In general I would say is insignificant downside, i am creating 350 Actions and the profiler tells me it takes <= 3ms.

I would vote for improving how Parallel.ForEach handles work when it's constrained by MaxDegreeOfParallelism but after reading the code that looks like much more work.

Parallel.Invoke is actually doing what i would expect from Parallel.ForEach, the issue is - it needs piping (creating the Action[]).

Clockwork-Muse commented 3 years ago

There is one common misunderstanding about tRFC, which has to be cleared up here: in the case of an external program talking to an ABAP system, tRFC is not an asynchronous execution, technically it’s fully synchronous...

That's an implementation detail, because the moment it hits the wire there's no way for the ABAP server to tell what the client's programming model is (or language). SAP publishes async/non-blocking node/python libraries - given these are wrappers around some C/C++ dll, C# could probably do the same.


The performance issues/gains aren't around creating the array, it's about servicing the contents of the array (or whatever collection). It's entirely making it so that processing code isn't actively waiting for networked results, so that you can get the next action started

(One additional advantage of doing things with async code would be that you wouldn't have to get all the actions up front for them all to be running concurrently - that is, currently the bank opens for a short time, gets all customers in a line, then closes its doors. With async code, it leaves the doors open, and starts working when there are only a few customers in line.)

Escoto commented 3 years ago

given these are wrappers around some C/C++ dll, C# could probably do the same.

You are completely right, and i've collaborated to a wrapper of this C++ libs -> NwRfcNet


It's entirely making it so that processing code isn't actively waiting for networked results

I get the point of not wanting to block the application while waiting for networked stuff, but correct me if i am wrong, anybody using Parallel.* would be blocking the application while waiting for their tasks to complete. Which again, for this use case that's perfectly ok, it's a console app that runs all by itself for several hours...

One additional advantage...

Yeah i understand the point of turning it all async, never the less i still have to control how many simultaneous calls i have active.

I really can't let SAP handle this, as i mention in one of my previous comments, I am also bound to the amount of simultaneous sessions one user can create, while probably SAP will allow me to start a session, this can result on my clients being sanctioned by SAP plus these processes are really CPU intensive, even if SAP allowed me to create 1k connections it might try to run 50 in parallel (or as many as their license allow) while making the rest wait, these many tasks would suffocate, that's another reason why i am constrained by the actual amount my clients (the SAP admins) allow.

For me, how long it takes is not something I care too much, i just care of the job getting done. From my pov, the Parallel library is perfect since i dont have to think about what to await, what to make a task etc, etc, i just built the solution and is basically ready to go.

Still, while i don't care about how long it takes, wasting Threads with the Parallel.ForEach is also unacceptable, which is really the reason why i created this issue.

Clockwork-Muse commented 3 years ago

I get the point of not wanting to block the application while waiting for networked stuff, but correct me if i am wrong, anybody using Parallel.* would be blocking the application while waiting for their tasks to complete. Which again, for this use case that's perfectly ok, it's a console app that runs all by itself for several hours...

Yes, using Parallel.* completely blocks until the function returns. In isolation (from start to end of the application), there isn't much discernible difference between blocking on this call or await-ing the calls in main() (outside of maybe any minor performance gains/losses). That's "in isolation", though. If this application is not the only thing running on the machine, using async code can increase overall throughput of all applications as they yield after submitting to the network (note the OS is still having to do context switching, but now it's at a much more natural point). Or, if the application is doing things like also writing to disk (or connecting to a different network resource), then the time spent doing those tasks are points at which yielding can get additional work done.

Yeah i understand the point of turning it all async, never the less i still have to control how many simultaneous calls i have active.

Sure, but doing that by thread is artificial (as opposed to setting connections up as a shared resource controlled by a semaphore). For one thing, it allows you to have more connections than cores on the machine (or, more connections than cores available to the process, at least). That's all theoretical if there isn't a non-blocking client, though.

Escoto commented 3 years ago

Yes, but still that does not solve the issue with Parallel.ForEach. I would say that at least it should be mentioned as a NOTE in the documentation that Parallel.ForEach is meant for cases where work is evenly distributed and point to Parallel.Invoke when not.

svick commented 3 years ago

I believe what you're looking for is EnumerablePartitionerOptions.NoBuffering. You use it through Partitioner.Create(), e.g.:

Parallel.ForEach(Partitioner.Create(inputs, EnumerablePartitionerOptions.NoBuffering), action);

This way, threads will grab items to process one by one, instead of grabbing them in batches, which is the default.

stephentoub commented 3 years ago

What @svick said.

Parallel.ForEach when given an IEnumerable<T> needs to pull each item out of the enumerator, but that is an inherently thread-unsafe operation, with the caller needing to make calls to the MoveNext + Current pair atomically. So the calling thread(s) must take a lock around the operation. If the work performed by the ForEach "body" is relatively cheap, the cost of taking that lock for each element in the enumerator dominates the cost of the overall operation. As such, by default ForEach works on a scheme whereby each thread takes one item each time it goes back to the enumerator, and then after a few times of this upgrades to taking two items each time it goes back to the enumerator, and then four, and then eight, and so on. This ammortizes the cost of taking and releasing the lock across multiple items, while still enabling parallelization for enumerables containing just a few items. It does, however, mean that if you've got a case where the body takes a really long time and the work for every item is heterogeneous, you can end up with an imbalance. To address that, NoBuffering changes the implementation to one that only ever takes a single item at a time.