Closed Helveg closed 1 month ago
@drodarie the approach in #865 didn't entirely work out because all jobs may get _enqueue
d at the same time immediately, so it was not a good moment to freeze and package the required cache items. On the worker's side though, the dispatcher
was a very good moment to check the required cache items!
So instead I opted for an MPI WIndow (a little documented feature that we can access via mpi4py
). A window allows a certain buffer (here a numpy array of 1000 ints) to be asynchronously accessed across processes. So whenever jobs finish, on the main node, we update the required cache items, and every time dispatcher
is called on a worker, it requests access to main's buffer and can check which cache items it still needs to keep :)
This means that only 1000 items can be cached, but you can easily increase that number
I did further tests, it seems the cache is freed by each worker, when dispatcher is called. The worker's JobPool
queue is empty when it performs the _read_required_cache_items
. This happens because the jobs are only sent when a worker is idle, meaning its queue is empty. Maybe, the test should be performed before the execute command?
The worker's JobPool queue is empty when it performs the
_read_required_cache_items
The worker's JobPool._job_queue
is empty by design, it is an unused variable on workers (the main
node does all the scheduling and job management, workers just run the calculating functions), but that does not matter: _read_required_cache_items
reads main
s JobPool._cache_buffer
with _cache_window.Get(bufpsec, 0)
into its own _cache_buffer
. main
s _cache_buffer
is updated with _update_cache_window
during every loop of main's job event loop in _execute_parallel
. So main
s _job_queue
is the only one that matters, and gets read out over MPI by the workers in an asymmetric remote memory access procedure via the cache window/buffer.
Was my test badly designed?
Actually, we may benefit from your approach, is there any way you could write the test in such a way that each worker writes their file, and at the end of the test we assert the expected content of each file? You could use unittest.mock
maybe patch the class so that you can write the calls and results to file?
There is an expected tiny bit of slippage possible, right when a worker finishes the last dependency, it might lock and read the cache buffer before main has updated it, and it may keep a cache 1 job iteration longer than required, but that's unavoidable without synchronizing, and I think any synchronized MPI operation might be harder to implement here.
Was my test badly designed?
And, hard to tell without seeing the code before me (pref in a unittest so I can also run it :D ) As you can see from my test I only ever tested it with an A-B-A job queue, and verifying that after B, B is removed from the cache.
Hi @Helveg, we found the bug located in free_stale_pool_cache
with @filimarc.
Basically, in parallel, since you are using MPI, required_cache_items
is a set of checksum keys, while in serial, it is a set of str.
We also extended the tests as you suggested to test how many times the cache was cleared.
Describe the work done
pool_cache
methods they hit, and read from a window on the main node which items are still required to be kept before starting new jobs.Tasks
📚 Documentation preview 📚: https://bsb--866.org.readthedocs.build/en/866/