PyWiFeS / pipeline

The Python data reduction pipeline for WiFeS
6 stars 26 forks source link

Fix and enable multithreading #32

Closed timothyfrankdavies closed 7 months ago

timothyfrankdavies commented 7 months ago

The 'mulithread' config caused the program to freeze on OzStar. I've made 1 change to fix the freeze, then 3 other changes to optimize a little.

The Freeze

The freeze was caused by creating a multiprocess.pool, making a list of jobs using imap_unordered, but only consuming the list after the pool was closed.

The jobs returned by imap_unordered are lazily evaluated, so it won't start running the processes until the list is read. It then yields each result in the order they complete. Reading the list only after the pool closes causes it to wait forever.

See (with imap_unordered just below it) for more info.

I've moved the code that consumes the list to its own function (which needs some more renaming), and that solves the freeze.

Other changes

The other changes are:

  1. Use os.sched_getaffinity(0) instead of multiprocessing.cpu_count() to get the number of CPUs available to the job, rather than the number of CPUs on the machine.
  2. Set a chunksize, which sets the number of jobs to assign to & return from each CPU at a time.
  3. Set multithread to true by default in the json configs. In a follow-up, we can change it programmatically instead.

chunksize is a bit of a complicated issue. I'm inclined to do what I've done here, and set an optimistic chunksize with some small tests to back it up, and revisit later if needed. There were comments suggesting a similar approach.

Here's the considerations:

  1. A chunksize of 1 on a large list adds a large number of system calls, as each CPU requests individual jobs.
  2. A chunksize that divides all jobs evenly between CPUs causes fewer system calls, but if jobs take different times to complete it can be a problem. e.g. if one job takes longer than every other job in the list combined, you'd rather assign it its own CPU.
  3. A larger chunksize can also cause larger memory usage, as the CPU needs to store all the args to the jobs as it runs them.


The pipeline already has an overall debug timer in the output file, search for All done in

With multithreading, the run completes thousands of seconds faster compared to running single threaded, and hundreds of seconds faster if running with additional cores.

With chunksize evenly dividing jobs I saw a ~ 40 second improvement. I'm unsure if that's beyond margin of error.

There are small differences in the logs before & after, but I think they're insignificant (e.g. differences due to adding numbers in different orders). It might be worth doing a test & plotting results to be sure.

timothyfrankdavies commented 7 months ago

I've merged the fixes in #33 and repeated tests.

I realised that my old tests hadn't configured single core tests correctly.

On the new tests, running with a single CPU or with multithread disabled takes roughly the same time, and running with extra cores reduced the processing time substantially. How significant it is depends on the data you're processing.

Results are now consistent except for the very last step: image

The last file runs with multithread=false, and is the only one with different values.

The differences here seem very significant, so I'd guess it's something to do with the order that floats are operated on, rather than some wrong calculation being done.

timothyfrankdavies commented 7 months ago

I've undone "1. Use os.sched_getaffinity(0) instead of multiprocessing.cpu_count()", as it caused a crash on macOS. I've created an issue to fix that as a follow-up here, though it may be low priority:

timothyfrankdavies commented 7 months ago

There's a few more changes needed before enabling multithreading, but they're a little too much work for one PR, and we've hit the end of the year.

For this PR, I'm disabling multithreading again. A little more work is needed for each section before enabling:

  1. wave_soln should only run multithreaded if it's faster than single threaded.
    1. Users should be able to set a max number of threads to use, then we'll use num_processes = min(max_threads, multiprocessing.cpu_count()) for the pool.
  2. cosmic_rays multithread should run in memory instead of using temporary files, and generally follow the same process as wave_soln.
  3. cube_gen causes a small difference in results. It should be traced and fixed.
  4. Currently any system using spawn method for subprocesses is very slow. We either need to disable multithreading if start_method = multiprocessing.get_start_method() == 'spawn', or investigate & fix the issue.
timothyfrankdavies commented 7 months ago

Quick update, (3) is now fixed, so the pipeline produces identical results whether running single-threaded or multi-threaded. The issue was the headers loaded by cube_gen.

I'm still inclined to leave multithreading disabled by default until we address the other points.

This PR should be good to go.