Open CielAl opened 6 months ago
@choosehappy Updated and added very simple multiprocessing support for CUDA: one worker process is assigned with a unique device ID - # of processes cannot exceed # of available CUDA devices.
The image handle/array adapter are now fully device-aware. Therefore, it makes future changes such as potential introduction of Dask a possible task (Dask handles both multiprocessing and distributed procedures much better than native python's built-in function and it can support multiple GPU nodes).
@jacksonjacobs1 @nanli-emory @choosehappy
Finally, I finish the dask implementation of multiprocessing/distributed learning for both CPU and GPU tasks. It basically enables the GPU-accelerated modules/image handles to utilize multiprocessing more efficiently.
A few takeways:
(1) Dask will be a requirement. However Dask-GPU is optional (just like cuCIM/cupy). Dask-GPU specifically deal with CUDA clusters and handle multiprocessing (and like Dask, it also handles distributed frameworks), and it basically solves the racing conditions I observe if we manually deploy a one-process-for-one-device multiprocessing task.
(2) It greatly simplify the deployment of multiprocessing procedures as well, hide most of the lower level operations/configurations under the hood. For isntance, the current HistoQC implementation would need a MultiprocessLoggingManager to launch a thread and fetch all logRecords from worker loggers through a synchronized queue handler. Under dask, you only need to forward all worker loggers directly to the main process.
(3) Basically it only slightly alters the main.py. Most of the other scripts are not touched. For instance, we would have a more readable layout to submit tasks to worker follow the below pattern:
if args.nprocesses > 0:
local_cluster = new_cluster('histoqc', is_cuda=is_cuda,
nprocesses=args.nprocesses, gpu_ids=args.gpu_ids)
with Client(local_cluster) as client:
# register the worker side
logging_setup.setup_client(client, forward_name="root")
# noinspection PyTypeChecker
futures_list = [client.submit(worker, idx, file_name, **_shared_state)
for idx, file_name in enumerate(files)]
for future in as_completed(futures_list):
try:
base_img_finished = future.result()
except Exception as exc:
worker_error(exc, failed)
else:
worker_success(base_img_finished, result_file=results)
(4) Finally, with dask and dask-gpu, it really significantly makes HistoQC scalable to large cohorts, as you may utilize CPU/GPUs across multiple machines for the downstream QC modules.
All changes are pushed to #290 if you are interested of reviewing the codes.
@CielAl This is an impressive contribution, thanks for your effort.
The added cuCIM support looks reasonable to me and I'll take a better look at the code soon.
The support for dask may actually conflict with the distributed approach (Ray) we've chosen and implemented in https://github.com/choosehappy/HistoQC/tree/ray_cluster_functionality
In addition to handling distributed processing via the Ray job scheduler, Ray supports infrastructure-as-code for provisioning clusters. Ray clusters also facilitate model serving and training, which we plan to use for QuickAnnotator & PatchSorter.
Based on your work, did you observe any advantages that Dask might have over Ray for multi-machine multiprocessing?
@jacksonjacobs1 Hi, appreciate the compliment.
My previous experiences were mostly about pyspark (which I despise) and dask so my opinion could be biased. Below are my personal opinions.
Now for your question, in one word: simplicity.
First, let's wrap up what both Ray and Dask can do:
Key difference:
Advantage of Dask:
dask.array
to np.array
(or cupy's), dask_ml
to sklearn
etc. In conclusion:
Thanks for the overview. FYI ray ships a scheduler for Dask to enable running dask environments on Ray clusters: https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html
Ray would be a much better choice due to its scalability in the scenario when you upscale the individual array operations in QC modules (e.g., chunking/lazy evaluation, etc). As mentioned earlier, this would require an overhaul of most modules and Ray would introduce more changes of codes. If your parallelism is only task-level (e.g. each worker runs a subset of images) then Ray might not have significant added value but could be annoying to develop.
I'm not sure I understand this point. We implemented HistoQC + Ray at the task level, which has substantial added value in addition to being conceptually simple;
$processing time \propto \dfrac{\#images}{\#cpu cores}
$
The user can add CPUs to their Ray cluster to meet their processing time needs, and nodes can even be added in live time.
Thoughts?
Appreciate feedbacks and my apologies due to a lack of clarification.
I'm not sure I understand this point. We implemented HistoQC + Ray at the task level, which has substantial added value in addition to being conceptually simple;
Where distributed frameworks such as Ray, Dask, or Spark shine is that they parallelize the individual array operation within the tasks (not that different to the MapReduce). For instance, you attempt to perform certain rank filter to a huge image which is way too large to be fit into physical memory, these frameworks breakdown the array into small chunks and each cluster will handle a few chunks, before the results from clusters being aggregated. All these operations will be managed by a scheduler for an optimized and balanced execution. (Other techniques such as lazy evaluation might also be involved to achieve this goal). Therefore, they are scalable when dealing with big data which can't be fit into individual machines.
And particularly in this scenario, Ray outperforms Dask due to its decentralized scheduling algorithm and has better scalability (dask's scheduler is mostly centralized).
However, if we only use Ray/Dask/... as a wrapper to deploy task-level parallelized jobs across single/multiple nodes, then in terms of scalability, none of them has much added value compared to each other, so scalability won't be why we choose a particular framework here, and we need to look into other factors for the decision.
FYI ray ships a scheduler for Dask to enable running dask environments on Ray clusters
That's actually a case of the aforementioned much fine-grained parallelization (e.g., dask.array). This might work only if (1) we intend to make HistoQC work at high magnification so that overhead is neglectable. (2) rewrite all individual modules to explicitly implement the parallelization.
If the scalability bonus of Ray is not significant (which I doubt, because after all in most cases individual tasks of HistoQC can be fit into a single node) or not applicapable (e.g., if we only aim for task-level parallelization) then it falls into where-ray-does-not-outperform-others scenario and we can just use dask API directly rather than dealing with a dask wrapper inside ray.
and nodes can even be added in live time Both Ray and Dask can achieve that quite handily.
So in the end I think in terms of implement the distributed framework itself, both Ray and Dask suffice, and the real question is the benefit of using Ray/Dask at development time and what features in future we might deploy.
We could perhaps add a checklist to compare the advantages/disadvantages later. My vote (very much biased) to Dask besides its easiness to use is because of its integration with Nvidia's Rapids ecosystem through the Dask-CUDA extension. Dask-CUDA handles GPU clusters a lot better and easier than Ray with both device and memory management implemented on the Rapids side. Note that Rapids not only speedup the IO of big images but also implement GPU-accelerated APIs of skimage (cuCIM) and sklearn (cuML).
My second concern is the nuisance of code changes for Deep Learning model training/deployment. Both Ray and Dask can achieve this (Ray uses ray.train; dask uses skorch or dask-pytorch-ddp) but in general dask's interface would be what's closer to the single machine/single processing counterpart. In fact, with Dask-CUDA it would be further.
Finally, it's way easier to trace the error/logging/debugging in Dask than Ray.
If your roadmap involves integration of more and more GPU-related tasks (e.g., model training/deployment) then Dask could be an easier choice with less nuissance, while Ray is still an available option.
Hi @choosehappy @jacksonjacobs1 @nanli-emory Finally some relief (and your inputs are appreciated), the CuCIM supports to both image handle and QC modules are implemented.
Unified Interface and Adaptors
An adapter class
histoqc.array_adapter.adapter.ArrayAdapter
is implemented to handle the calls of array operations on CPU/GPU without overwhelmingly changing the existing code. It provides a decorator such that the CPU function call in the original modules like:dilation(img_small, footprint=np.ones((kernel_size, kernel_size)))
can be easily turned to:adapter(dilation)(img_small, footprint=np.ones((kernel_size, kernel_size)))
How does it work: (1) Is an input device type (CPU or CUDA) specified? If so the adapter will push the input array(s) to specified device and invoke the corresponding function. If not the adapter will rely on the device of the input array(s) itself. (2) Is an output device type specified? If so, the adapter will always push the output array (do nothing to non-array outputs) to the specified device. (3) If the GPU-accelerated impl. of a function does not exist, adapter will revert to CPU calls, but the output device is still guaranteed. (4) By default, each image_handle has its own adapter that specifies the device type. For an openslide handle, both input and output device will be CPU, and for cucim handle they will be both CUDA. Therefore, given an image handle type, the associated adapter can guarantee all img/masks generated are on the same device. Methods such as
ArrayAdapter.device_sync_all
/ArrayAdapter.sync
also provides explicit ways to sync the device of one or more arrays if needed. (5) cupy and numpy shares a tons of unified array operations already (e.g., np.sqrt works for cupy arrays).Benchmark I tested with five Aperio WSIs (40x, average HxW
66374 x 65736
) using the default config.ini pipeline/parameters, using cucim and openslide correspondingly. For CPU-only (openslide) mode, 8 workers are specified. The machine has an RTX Titan, 128GB memory, and Ryzen 3800x CPU and the OS is Ubuntu 20.04.CUDA significantly boosts running time efficiency compared to the multiprocessing (8 cores/8 subprocesses) CPU counterpart.