dougbinks / enkiTS

A permissively licensed C and C++ Task Scheduler for creating parallel programs. Requires C++11 support.
zlib License
1.66k stars 138 forks source link

Rejecting a task and adding it back to the queue? #105

Open BrodyHiggerson opened 7 months ago

BrodyHiggerson commented 7 months ago

Another API / design creator blessing question for you.

In my existing task system, there is the concept of inter-task dependencies same as in enkiTS, but I also support read/write dependency declarations, where the task processing algorithm first attempts to satisfy the dependencies, and then all jobs that have task dependencies solved are put into a queue where they 'fight' to grab their read/write deps. It's not possible to know in advance which will win and imo it's not practical or worth trying to 'solve' that entire tree, instead we let it happen at runtime.

An example, called regularly, e.g. when a task completes and pulls in any dependent tasks that are unblocked. It's messy but more of an illustration.

void TickGraph::ReadyContendingJobsIfPossible()
{
// lock here ...
    static thread_local std::vector<TickJob*> contendeds;
    contendeds.reserve(128);

    {
        TickJob* contendingJob;
        while (contendingTickQueue_.try_pop(contendingJob))
        {
            if (impl_->typeDepTracker_.TryPushNode(contendingJob->node_)) // check against read/write dependencies
            {
                ExecuteJob(contendingJob);
            }
            else
            {
                contendeds.push_back(contendingJob); // we failed, so prepare to put the job back into the contending queue
            }
        }
    }

    for (TickJob* contended : contendeds)
    {
        MoveToContending(contended); // put it back in for next time
    }
    contendeds.clear();

// unlock here ...
}

I'm not trying to replicate this exactly but just giving context. I could imagine maybe in TaskScheduler::TryRunTask, in the block when bHaveTask is true, I add a call to some optional callback that checks if the task is actually allowed (defaulting to true), and if not, it calls AddTaskSetToPipe to put it back into circulation and early-outs. Not sure how I feel about the task going back into the end of the pipe, but maybe that's fine. Would also need to figure out a way to do it for Pinned tasks too.

Interested in any insights you might have from a canonical design/API POV, and I may have missed something that would help.

dougbinks commented 7 months ago

I have been considering adding dynamic dependencies which would support data access dependencies to enkiTS, but have not yet done so.

A simpler change which I have also been considering is the idea of allowing tasks to delay themselves, which for tasksets would allow a given range (within the calling range) to be delayed (put back in the task queue). This would allow your tasks to perform their checks and delay if unfulfilled.

BrodyHiggerson commented 7 months ago

Could you elaborate on the "given range" being delayed? As in the entire task set itself?

Also, as a stopgap I'll have to implement something quick and dirty to unblock, even if it lives in the darkness of a fork. Does the point in the flow I suggested make sense? I'm not sure how that would play with this concept of subtask splitting that I've seen around. I'd be calling out to some threadsafe callback or even adding a virtual func to the task, after we find a task, as mentioned, and push it back into the pipe if rejected. I'm fine to push into the pipe while I'm in the task finding path right?

On Tue, 21 Nov 2023, 11:37 pm Doug Binks, @.***> wrote:

I have been considering adding dynamic dependencies which would support data access dependencies to enkiTS, but have not yet done so.

A simpler change which I have also been considering is the idea of allowing tasks to delay themselves, which for tasksets would allow a given range (within the calling range) to be delayed (put back in the task queue). This would allow your tasks to perform their checks and delay if unfulfilled.

— Reply to this email directly, view it on GitHub https://github.com/dougbinks/enkiTS/issues/105#issuecomment-1820846183, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABXJBJUNJUXYQFSQE7IHXCTYFSOATAVCNFSM6AAAAAA7UIVGAGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMRQHA2DMMJYGM . You are receiving this because you authored the thread.Message ID: @.***>

dougbinks commented 7 months ago

Could you elaborate on the "given range" being delayed? As in the entire task set itself?

So ITaskSet is executed (potentially on many threads) with a given TaskSetPartition range_. The idea is that when in ExecuteRange a TaskSet could call Delay( TaskSetPartition delayRange_ ) with delayRange_.start >= range_.start and delayRange_.end <= range_.end (with delayRange_.start < delayRange_.end ).

This could be used in your approach to put the entire range back on the task queue.

For pinned tasks this is simpler as the whole task can be delayed.

Also, as a stopgap I'll have to implement something quick and dirty to unblock, even if it lives in the darkness of a fork. Does the point in the flow I suggested make sense?

I'm not sure - enkiTS tries to guarantee that it is lock-free (which helps ensure no single task thread blocks all other task threads). This makes the design harder.

My current idea was to implement this in terms of a type of Dependency which the user could derive from. I've not put enough thought into exactly how this would work, and whether it is possible to make lock free.

BrodyHiggerson commented 7 months ago

Thanks for the explanations; I hadn't considered that angle as I still think of tasks as singular pieces of work, yet the basic building blocks are oriented around each task object actually being a parallel set of tasks.

Outside of a parallel for, I'd likely just implement wrappers that have one ITaskSet instance per task, eg one per system in an ECS context.

My example was misleading in that the lock was implementation specific due to how my queue worked; my actual dependency checking/reservation algorithm is lock free (by walking atomics), so theoretically it could be called after a task is grabbed from the pipe, and then stuck back on. Will give it a go and see what explodes.

On Wed, 22 Nov 2023, 1:15 am Doug Binks, @.***> wrote:

Could you elaborate on the "given range" being delayed? As in the entire task set itself?

So ITaskSet is executed (potentially on many threads) with a given TaskSetPartition range. The idea is that when in ExecuteRange a TaskSet could call Delay( TaskSetPartition delayRange ) with delayRange.start >= range.start and delayRange.end <= range.end (with delayRange.start < delayRange.end ).

This could be used in your approach to put the entire range back on the task queue.

For pinned tasks this is simpler as the whole task can be delayed.

Also, as a stopgap I'll have to implement something quick and dirty to unblock, even if it lives in the darkness of a fork. Does the point in the flow I suggested make sense?

I'm not sure - enkiTS tries to guarantee that it is lock-free (which helps ensure no single task thread blocks all other task threads). This makes the design harder.

My current idea was to implement this in terms of a type of Dependency which the user could derive from. I've not put enough thought into exactly how this would work, and whether it is possible to make lock free.

— Reply to this email directly, view it on GitHub https://github.com/dougbinks/enkiTS/issues/105#issuecomment-1821006334, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABXJBJQ4AFHSGC5HWYHUICDYFSZQRAVCNFSM6AAAAAA7UIVGAGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMRRGAYDMMZTGQ . You are receiving this because you authored the thread.Message ID: @.***>

BrodyHiggerson commented 7 months ago

Here is my first naive attempt, wasn't sure if it made sense to bother adding back to the same thread we stole from or not, or if this even makes sense: https://github.com/dougbinks/enkiTS/commit/559a196119fdc50cd1831bcf85c3fe8f9ae2007d

But in my mind, I'm only using a taskset to represent either a single logical task or a ParallelFor, and thus in both cases I only care about rejecting the entire set wholesale. In the above code, I had to reject each subtask until the overall set's interface says it can execute. Not sure if that makes sense.

dougbinks commented 7 months ago

My plan was to add this functionality via dynamic dependencies, in which tasks which couldn't run aren't added to the task queue until they are able to run.

Some comments on your implementation:

ITaskSet implementation

  1. The tasks pipes m_pPipesPerThread can only be written to by the owning thread, so you cannot use threadReadFrom, but must always use threadNum_ to write to.
  2. returning false from TryRunTask should not occur unless no runnable tasks were found. So the CanRun test needs to be after each bHaveTask check, setting bHaveTask false rather than exiting.
  3. A potential infinite loop or stuck queue could occur if the task was found from the current thread's pipe, as it could be placed on the task queue and then retrieved again. Instead, we need to grab the next head task before replacing the current task. However this could also be a task which can't run, in which case we need to go through the tasks in the queue until we find one which can run. This is tricky without creating a new queue for unrunnable tasks (enkiTS does no allocation outside of initialization).
  4. Finding only an un-runnable task could stall enkiTS, as the task threads could sleep and then there will be no new tasks to wake up the threads. So we need a signal to enkiTS that tasks can run.
  5. When task stealing (reading from another threads queue), un-runnable tasks are placed on the current threads queue. This could result in the queue being full, which needs to be handled somehow or that task thread will be stalled.
  6. I would likely add a set of flags to the ITaskSet interface to denote that a given task uses the CanRun interface, though first I would check the performance degradation.
  7. Question: should a task which has already run some of it's range use the CanRun check, or should it implicitly be in a runnable state?
  8. How does CanRun lock any resource if it returns true, and how is that resource then unlocked? How does that work with split tasks (ones with m_SetSize > 1)?

IPinnedTask implementation

  1. I think the current loop could run infinitely. Fixing this will require going through the entire list until we hit the first task we couldn't run I think.
  2. We also need to wake any enkiTS tasks threads waiting for pinned tasks with WaitForNewPinnedTasks when a pinned task can now run.
dougbinks commented 7 months ago

Thanks for this initial implementation by the way, it's a good start towards such an API and has helped uncover some of the issues with delaying / not running tasks.

dougbinks commented 5 months ago

After some thought I have an idea for how this could work, and will try to develop solution on a branch in the near future. I'll ping this thread once I have a working prototype.