afritz1 / OpenTESArena

Open-source re-implementation of The Elder Scrolls: Arena.
MIT License
915 stars 68 forks source link

Job Manager #247

Closed lowercase-g closed 3 days ago

lowercase-g commented 1 year ago

This implements a job system (hopefully closes #245).

Examples:

// If the job manager has at least 3 threads to work with, this 
// only takes 1 second.
{
    auto jm = JobManager(3);

    Job sleep ([](){ 
       std::this_thread::sleep_for(1000ms);
    });
    jm.submitJobs ( { sleep, sleep, sleep } );
}
// This, too, should only take 1 second.
{
    auto max_threads = std::thread::hardware_concurrency(); 
    auto jm = JobManager(max_threads);

    std::vector< Job > job_list (
       max_threads,
       Job( [](){ std::this_thread::sleep_for(1000ms); } )
    );

    jm.submitJobs ( job_list );
}
// This will wait for all jobs to be done before resuming the calling thread.
{
    auto max_threads = std::thread::hardware_concurrency(); 
    auto jm = JobManager(max_threads);

    Job sleep ([](){ 
       std::this_thread::sleep_for(1000ms);
    });

    jm.submitJobs( { sleep } );

    jm.wait();
    std::cout << "\nDone!";
}

Essentially, when a JobManager is constructed with an n_threads argument, it creates a private ThreadPool. That ThreadPool, in turn, creates n_threads Workers.

JobManager::run() spawns a background thread to do the following: 1) While the job queue is not empty, ask the thread pool for an idle worker.

If the thread pool can't immediately provide an idle worker, the background thread will wait until one is available. 

2) When an idle worker has been obtained, pop a job from the front of the queue and tell the idle worker to do it.

JobManager, if given a list of jobs in its constructor (optional), immediately starts running the job distributor. Otherwise, JobManager::run() only runs when the following conditions are met: 1) JobManager::submitJobs() has been called.

2) JobManager::isRunning() returns false.

3) The job queue is not empty.

This means the only job-performing method you need to worry about is JobManager::submitJobs(), and that adding jobs to an already-running queue is completely safe and intended behaviour:

{
    auto jm = JobManager(std::thread::hardware_concurrency());

    Job sleep ([](){ 
       std::this_thread::sleep_for(1h);
    });
    // Since the queue was empty before, this starts the job system, 
    // which will run for an hour.
    jm.submitJobs ( { sleep } ); 
    // Completely safe, and a new instance of sleep has been appended to
    // jm's already-existing queue.
    jm.submitJobs ( { sleep } ); 
}

To do:

[ ] Categories (filter which jobs JobManager::wait() waits for)
[ ] Decentralize job distribution a bit more.
afritz1 commented 1 year ago

Thanks. I'll start reviewing this tonight and push revisions. The Readme is not necessary.

afritz1 commented 1 year ago

I need permission to write to your branch.

Enumerating objects: 54, done.
Counting objects: 100% (54/54), done.
Delta compression using up to 32 threads
Compressing objects: 100% (46/46), done.
Writing objects: 100% (47/47), 6.26 KiB | 6.26 MiB/s, done.
Total 47 (delta 34), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (34/34), completed with 5 local objects.        
remote: error: GH006: Protected branch update failed for refs/heads/Job-System.        
remote: error: At least 1 approving review is required by reviewers with write access.        
To https://github.com/i-am-the-gabe/OpenTESArena.git
 ! [remote rejected]   pr/247 -> Job-System (protected branch hook declined)
error: failed to push some refs to 'https://github.com/i-am-the-gabe/OpenTESArena.git'
lowercase-g commented 1 year ago

I need permission to write to your branch.

Enumerating objects: 54, done.
Counting objects: 100% (54/54), done.
Delta compression using up to 32 threads
Compressing objects: 100% (46/46), done.
Writing objects: 100% (47/47), 6.26 KiB | 6.26 MiB/s, done.
Total 47 (delta 34), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (34/34), completed with 5 local objects.        
remote: error: GH006: Protected branch update failed for refs/heads/Job-System.        
remote: error: At least 1 approving review is required by reviewers with write access.        
To https://github.com/i-am-the-gabe/OpenTESArena.git
 ! [remote rejected]   pr/247 -> Job-System (protected branch hook declined)
error: failed to push some refs to 'https://github.com/i-am-the-gabe/OpenTESArena.git'

My bad, bungled the branch protection. Should work now.

afritz1 commented 1 year ago

Okay, I made some changes to bring it more in line with what I'm looking for. Questions:

  1. Can we fix the 'here be dragons'?
  2. Can we make a submitJob() that takes one job? I think a common use case would be submitJob(...); submitJob(...); submitJob(...); wait();. It can use submitJobs() internally.
  3. Can we allocate the ThreadPool without std::unique_ptr?
  4. Can we fix the TODO on JobManager?

I'll handle # 2.

lowercase-g commented 1 year ago

Taking down (4) as we speak.

Edit: Looked into it, and it might not be necessary. It just sounds strange to have a job scheduler manage and request worker threads while being a worker thread itself. It'd also make the wait() implementation that much more confusing, and we need to keep it as simple as possible before categories are implemented.

afritz1 commented 1 year ago

I tried this in Game::loop() but I don't understand why the main thread is blocked until the jobs are done. Maybe I'm misunderstanding locks/mutexes.

    JobManager jobManager;
    jobManager.init(8);

    std::mutex mut;
    auto lamb = [&mut](int i)
    {
        return [&mut, i]()
        {
            std::unique_lock<std::mutex> lock(mut);
            std::cout << "lamb " << i << '\n';
            lock.unlock();

            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        };
    };

    std::vector<Job> jobs;
    for (int i = 0; i < 200; i++)
    {
        jobs.emplace_back(lamb(i));
    }

    jobManager.submitJobs(jobs);

    Job job = [&mut]()
    {
        std::lock_guard<std::mutex> lock(mut);
        std::cout << "Last job" << '\n';
    };

    jobManager.submitJob(job);

    // Primary game loop.
    int frame = 0;
    while (this->running)
    {
        std::unique_lock<std::mutex> lock(mut);
        std::cout << "frame " << frame << '\n';
        lock.unlock();
        frame++;

The output is

...
lamb 197
lamb 198
lamb 199
Last job
frame 0
frame 1
frame 2

But I expected like

lamb 0
lamb 1
frame 0
lamb 3
lamb 2
...

Edit: looks like if I comment out the submitJob() then it works as expected.

afritz1 commented 1 year ago

Also I'm not sure if this is an easy change but Worker::invoke() should not create a new thread, it should reuse the std::thread it was initialized with. I don't want std::this_thread::get_id() changing for the same worker.