data-apis / array-api

RFC document, tooling and other content related to the array API standard
https://data-apis.github.io/array-api/latest/
MIT License
205 stars 42 forks source link

Parallelism - what do libraries offer, and is there an API aspect to it #4

Closed rgommers closed 3 months ago

rgommers commented 4 years ago

Several people have expressed a strong interest in talking about and working on (auto-)parallelization. Here is an attempt at summarizing this topic.

Current status

Linear algebra libraries

The main accelerated linear algebra libraries that are in use (for CPU based code) are OpenBLAS and MKL. Both of those libraries auto-parallelize function calls.

OpenBLAS can be built with either its own pthreads-based thread pool, or with OpenMP support. The number of threads can be controlled with an environment variable (OPENBLAS_NUM_THREADS or OMP_NUM_THREADS), or from Python via threadpoolctl. The conda-forge OpenBLAS package uses OpenMP; the OpenBLAS builds linked into NumPy and SciPy wheels on PyPI use pthreads.

MKL supports OpenMP and Intel TBB as the threading control mechanisms. The number of threads can be controlled with an environment variable (MKL_NUM_THREADS or OMP_NUM_THREADS), or from Python with threadpoolctl.

NumPy

NumPy does not provide parallelization, with the exception of linear algebra routines which inherit the auto-parallelization of the underlying library (OpenBLAS or MKL typically). NumPy does however release the GIL consistently where it can.

Scikit-learn

Scikit-learn provides a keyword n_jobs=1 in many estimators and other functions to let users enable parallel execution. This is done via the joblib library, which provides both multiprocessing (default) and threading backends that can be selected with a context manager.

Scikit-learn also contains C and Cython code that uses OpenMP. OpenMP is enabled in both wheels on PyPI and in conda-forge packages. The number of threads used can be controlled with the OMP_NUM_THREADS environment variable.

Scikit-learn has good documentation on parallelism and resource management.

SciPy

SciPy provides a workers=1 keyword in a (still limited) number of functions to let users enable parallel execution. It is similar to scikit-learn's n_jobs keyword, except that it also accepts a map-like callable (e.g. multiprocess.Pool.map to allow using a custom pool. C++ code in SciPy uses pthreads; the use of OpenMP was discussed and rejected.

scipy.linalg also provides a Cython API for BLAS and LAPACK. This lets other libraries use linear algebra routines without having to ship or build against an accelerated linear algebra library directly. Scikit-learn, statsmodels and other libraries do this - thereby again inheriting the auto-parallelization behavior from OpenBLAS or MKL.

Deep learning frameworks

TensorFlow, PyTorch, MXNet and JAX all have auto-parallelization behavior. Furthermore they provide support for distributed computing (with the exception of JAX). These frameworks are very performance-focused, and aim to optimally use all available hardware. They typically allow building with different backends like NCCL or GLOO for GPU support, and use OpenMP, MPI, gRPC and more.

The advantage these frameworks have is that users typically only use this one framework for their whole program, so the parallelism used can be optimized without having to play well with other Python packages that also execute code in parallel.

Dask

Dask provides parallel arrays, dataframes and machine learning algorithms with APIs that match NumPy, Pandas and scikit-learn as much as possible. Dask is a pure Python library and uses blocked algorithms; each block contains a single NumPy array or Pandas dataframe. Scaling to hundreds of nodes is possible; Dask is a good solution to obtain distributed arrays. When used as a method to obtain parallelism on a single node however, it is not very efficient.

Auto-parallelization and nested parallelism

Some libraries, like the deep learning frameworks, do auto-parallelization. Most non deep learning libraries do not do this. When a single library or framework is used to execute an end user program, auto-parallelization is usually a good thing to have. It uses all available hardware resources in an optimal fashion.

Problems can occur when multiple libraries are involved. What often happens is oversubscription of resources. For example, if an end user would write code using scikit-learn with n_jobs=-1, and NumPy would auto-parallelize operations, then scikit-learn will use N processes (on an N-core machine) and NumPy will use N threads per process - leading to N^2 threads being used. On machines with a large number of cores, the overhead of this quickly becomes problematic. Given that NumPy uses OpenBLAS or MKL, this problem already occurs today. For a while Anaconda and Intel shipped a modified NumPy version that had auto-parallelization behavior for functions other than linear algebra - and the problem occurred more frequently.

The paper Composable Multi-Threading and Multi-Processing for Numeric Libraries from Malakhov et al. contains a good overview with examples and comparisons between different parallelization methods. It uses NumPy, SciPy, Dask, and Numba, and uses multiprocessing, concurrent.futures, OpenMP, Intel TBB (Threading Building Blocks), and a custom library SMP (symmetric multi-processing).

Limitations due to Python package distribution mechanisms

When one wants to use auto-parallelization, it's important to have control over the complete set of packages that a user gets installed on their machine. That way one can ensure there's a single linear algebra library installed, and a single OpenMP runtime is used.

That control over the full set of packages is common in HPC type situations, where admins need to deal with build and install requirements to make libraries work well together. Both packages managers (e.g. Apt in Debian) and Conda have the ability to do this right as well - both because of dependency resolution and because of a common build infrastructure.

A large fraction of Python users install packages from PyPI with Pip however. The binary installers (wheels) on PyPI are not built on a common infrastructure, and because there's no real support for non-Python dependencies, libraries like OpenMP and OpenBLAS are bundled into the wheels and installed into end user environments multiple times. This makes it very difficult to reliably use, e.g., OpenMP. For this reason SciPy uses custom pthreads thread pools rather than OpenMP.

The need for a better API pattern or library

The default behavior for libraries like NumPy and SciPy given the status of the ecosystem today should be to be single-threaded, otherwise it composes badly with multiprocessing, scikit-learn (joblib), Dask, etc. However, there's room for improvement here. Two things that could help improve the coordination of parallelization behavior in a stack of Python libraries are:

  1. A common API pattern for enabling parallelism
  2. A common library providing a parallelization layer

A common API pattern is the simpler of the two options. It could be a keyword like n_jobs or workers that gets used consistently between libraries, or a context manager to achieve the same level of per-function or per-code-block control.

A common library would be more powerful and enable auto-parallelization rather than giving the user control (which is what the API pattern does). From a performance perspective, having arrays and dataframes auto-parallelize their functions as much as possible over all cores on a single node, and then letting a separate library like Dask deal with multi-node coordination, seems optimal. Introducing a new dependency into multiple libraries at the core of the PyData ecosystem is a nontrivial exercise however.

The above attempts to summarize the state of affairs today. The topic of parallelization is largely an implementation rather than an API question, however there is an API component to it with option (1) above. How to move forward here is worth discussing.

Note: there's also a lot of room left in NumPy also for optimizing single-threaded performance. There's ongoing work on making better use of intrinsics (this is a large effort, ongoing), or using SLEEF for vector math (discussed in the past, no one is working on it).

amueller commented 4 years ago

sklearn now actually uses threadpoolctl internally to make some computations parallel by default, such as in HistGradientBoostingClassifier and makes sure others are not parallel by setting jobs to 1. There is some issues with nesting, and there is issues with finding the right number of threads. Right now we use the number of (virtual) cores which often seems to be a bad idea, and the physical cores might be better. I don't think we have an entirely consistent story about the interactions between n_jobs and our use of OpenMP.

So in conclusion: just in scikit-learn, this is already a mess, 'only' dealing with 4 types of parallelism (n_jobs processes, n_jobs threads, OpenMP and BLAS). We could have our own 'library' solution, but I don't think anyone of us has the expertise to do this; it's probably pretty hard to actually know how to allocate cores across different ML algorithms. I'm not sure where to even start on that.

I'm not sure I understand proposal 2: is that a python library? How would that integrate with the C & Fortran code? If it's a C library: how does it integrate with numba?

rgommers commented 4 years ago

I'm not sure I understand proposal 2: is that a python library? How would that integrate with the C & Fortran code? If it's a C library: how does it integrate with numba?

tdimitri commented 4 years ago

Some parallelism issues that come to mind. My comments assume built in threading is always available (as opposed to dependency to external threading packages). Even if a package does not want to support threading.. they can just return a warning or error.

  1. Thread control, config, and startup. How the threads are started, how many threads, process affinity, pinning threads, numa, thread control and configuration. OpenMP vs TBB vs built in. What happens when a user tries to control a thread in a way that an external package does have the feature for? Just issue a warning?

  2. Capturing and using other threads - is this allowed? Perhaps the user tells us which threads to use. We capture them and wake them up for work until the users tells us we cannot anymore.

  3. Determinism problems: for example sum of a float64. The order in which the sum executes may affect the result for certain values (bit wobbling in the last bits of mantissa). Perhaps a determinism flag can be set -- if set then sum must be deterministic to last bit -- otherwise bit wobbling allowed.

  4. Thread tuning per operation. For instance calculating the absolute value may cap out at 3 threads. More threads may just waste resources. log() may want to use more threads. Is the user allowed to tune that?

At least three types of thread work division (do we want to expose this at a lower level API)?

  1. Symmetric computations: Work is divided equally across the array. For instance an array of 1 million may be divided into 10 smaller chunks of 100,000 each resulting in 10 smaller work assignments. Then can user configure chunk size?

  2. Asymmetric computations: Work is based on a number. You have job no. 1 vs job no. 2. The thread is simply told it has a job. Job 1 might be sum everything for group 1 which might have more values than Job 2.

  3. Scatter gather approaches. Often for reduction, the min of mins is the min of them all.

Is the user allowed to call into the symmetric divider for 1. ? What about 2. - this is similar to nb.prange(). And 3. is used for all reductions.

I believe requiring an external threading package creates an unnec. dependency and thus recommend against it. If the user tries to configure a thread in a manner not supported - API can be quiet, warning, or error based on some setting.

I think the parallelism API is in another package that we control the API for. We also control all the configuration for it. If our threading API package uses another threading package like TBB or OpenMP --the dependencies are abstracted away. We could just wave our hands and say outside package, outside scope -- problem gone -- but i encourage the team to solve the problem.

rgommers commented 4 years ago

Thanks for the detailed feedback @tdimitri.

Thread control, config, and startup. How the threads are started, how many threads, process affinity, pinning threads, numa, thread control and configuration. OpenMP vs TBB vs built in. What happens when a user tries to control a thread in a way that an external package does have the feature for? Just issue a warning?

This to me is the most important question. I don't have an answer (there's multiple possible answers though); maybe step 1 would be to figure out if an API for this should be included in this RFC. Or if not, how do we move forward in the libraries of interest.

Capturing and using other threads - is this allowed?

My vote would be no - that's giving too much control to the user, and I can't think of any library of interest that allows this.

Determinism problems: for example sum of a float64. The order in which the sum executes may affect the result for certain values (bit wobbling in the last bits of mantissa). Perhaps a determinism flag can be set -- if set then sum must be deterministic to last bit -- otherwise bit wobbling allowed.

A global flag is what PyTorch is introducing right now (still experimental).

Thread tuning per operation. For instance calculating the absolute value may cap out at 3 threads. More threads may just waste resources. log() may want to use more threads. Is the user allowed to tune that?

Not sure, it combines with the first question - how granular should an API be? Most libraries don't give this level of control. Instead, max number of threads is set, and the rest is up to the library.

I believe requiring an external threading package creates an unnec. dependency and thus recommend against it. If the user tries to configure a thread in a manner not supported - API can be quiet, warning, or error based on some setting.

I think the parallelism API is in another package that we control the API for.

Aren't the first and last sentence here contradictory?

rgommers commented 4 years ago

We could have our own 'library' solution, but I don't think anyone of us has the expertise to do this; it's probably pretty hard to actually know how to allocate cores across different ML algorithms.

I think doing it optimally is quite hard indeed. However, doing better than we're doing today isn't too difficult.

I do think figuring out an API pattern is a lot easier than a separate library, and is already quite valuable. Attempting the library (solution 2) would be ambitious - at some point it needs doing, but I think it's mostly beyond the scope of the current API standard goals. We could describe the approach though, and make sure any developments in libraries converges rather than diverges.

tdimitri commented 4 years ago

Thanks for the detailed feedback @tdimitri.

Thread control, config, and startup. How the threads are started, how many threads, process affinity, pinning threads, numa, thread control and configuration. OpenMP vs TBB vs built in. What happens when a user tries to control a thread in a way that an external package does have the feature for? Just issue a warning?

This to me is the most important question. I don't have an answer (there's multiple possible answers though); maybe step 1 would be to figure out if an API for this should be included in this RFC. Or if not, how do we move forward in the libraries of interest.

Capturing and using other threads - is this allowed?

My vote would be no - that's giving too much control to the user, and I can't think of any library of interest that allows this.

Determinism problems: for example sum of a float64. The order in which the sum executes may affect the result for certain values (bit wobbling in the last bits of mantissa). Perhaps a determinism flag can be set -- if set then sum must be deterministic to last bit -- otherwise bit wobbling allowed.

A global flag is what PyTorch is introducing right now (still experimental).

sounds good

Thread tuning per operation. For instance calculating the absolute value may cap out at 3 threads. More threads may just waste resources. log() may want to use more threads. Is the user allowed to tune that?

Not sure, it combines with the first question - how granular should an API be? Most libraries don't give this level of control. Instead, max number of threads is set, and the rest is up to the library.

This is for the threading ecosystem. Extra threads just slows down the calculation and consumes a threading resource another process could be using. Too few threads (when allowed) also slows down calculations. Different calculations have different compute times. For instance reduce operations often only read from memory returning a single scalar result (and thus are less memory bound and can use more threads). If might take 3 threads to cap out on add float32, while sqrt caps out at 8 threads. The code is a lookup table (not difficult). I know it helps because it has been tested in riptide. Readers of this issue can play with threads today to see the asymmetry in calculations. To me, it is a small amount of code (LUT) to handle a known issue. The risk is low and the reward is high.

Two concepts: how many threads in the thread pool and how many threads needed per given calculation.

On a similar note there is also a threading threshold -- for instance if we are adding 30,000 float32 numbers, the cost to release an additional worker thread is often not worth it. However at 50,000 numbers one extra thread may be helpful, but releasing all 8 would be wasteful. The linux futex can dynamically release a thread count in one OS API.

I believe requiring an external threading package creates an unnec. dependency and thus recommend against it. If the user tries to configure a thread in a manner not supported - API can be quiet, warning, or error based on some setting. I think the parallelism API is in another package that we control the API for.

Aren't the first and last sentence here contradictory?

Apologize for the confusion. Requiring an external threading package (not controlled by this API) creates an unnec dependency to a foreign package. Rather if we control the threading package and API, then it is a local dependency (with versions changing in sync) that can be bundled together.

jack-pappas commented 4 years ago

@aregm posted a nice writeup / summary on threading APIs in another thread, and it feels very relevant to this discussion of parallelism as well:

Omni Parallel Runtime_New.pdf

jack-pappas commented 4 years ago

@aregm Thank you for sharing that writeup, it contains some great insights into the various trade-offs in these threading layers; I thought the comparison of compute vs. I/O handling is especially interesting. You may be interested to read the Why HPX? writeup for the HPX library, as it covers some of the same issues and how HPX is trying to address them through their programming model.

One general approach not included in your report but could be interesting is to implement a lightweight portability layer on top of an OS-integrated threadpool like libdispatch (a.k.a Grand Central Dispatch) on macOS or the Thread Pool API on Windows. These APIs are already meant to provide support for both computational and IO tasks, and their integration with the OS (and scheduler) could help avoid oversubscription issues. There would be some downsides to this approach -- for starters, explicit control over thread/core affinity is more difficult or impossible, and we'd likely want to incorporate task-splitting logic ("auto-balancing") in the portability layer since the queues won't support that natively. It feels like it's worth some experimentation though, if only to provide some data for comparison against the other threading libraries.

aregm commented 4 years ago

Yes, it's been around a long time. I didn't follow the latest developments there, but I think it is conceptually the same.

Basically, there was a thrust in the HPC community that we needed to completely rethink how we wrote software in HPC for the upcoming exascale era. The idea was that we needed to create a vast swarm of tasks which would then migrate to the data to do the computation. In practice, however, it was a pain in the butt to work with. It required massive change to your software that made the code more complex than legacy code (message passing between a fixed group of processes).

Intel in collaboration with Rice University contributed Open Compute Runtime to this movement, which then evolved to the Habanero-C https://github.com/habanero-rice/hclib. Such approaches require massive changes, and when you look at the code, it is IMHO ugly and it didn't perform well. MPI was superior in every way, though very low-level.

GCD is old and works only on MacOS, and was never intended to work well for NUMA, custom allocators, etc.

Windows Thread Pool is a good pool API, but for Windows only. With all of the problems specific to Thread Pool described in the doc.

TomNicholas commented 1 year ago

In https://github.com/pydata/xarray/pull/7019 we're working on generalising xarray to wrap parallel array types other than dask.

It's a WIP, and a lot of the discussion is also happening the pangeo distributed array computing working group meetings, but the general idea is that any array type that conforms to the numpy API standard and offers certain additional methods (e.g. .chunk(), .compute(), .from_array()) can be treated the same way dask is currently treated in xarray.

We also need to apply versions of key functions that dask defines (particularly blockwise and apply_gufunc) to sets of parallel arrays from within xarray. To get access to library-specific implementations of these functions I've currently proposed the wrapped implementations subclass a "ChunkManager" ABC xarray will provide, and register it with an entrypoint, but we're open to better suggestions of how to implement this.

What would be amazing would be to eventually have a standard for "parallel numpy-like arrays" that adds chunks etc. to the current array API standard. Then xarray users could easily swap out for different parallel compute backends and see which works best on their workloads.

There is interest from the ramba, Arkouda, and cubed libraries so far too.

cc @drtodd13, @tomwhite

rgommers commented 1 year ago

@TomNicholas thanks for bringing this up! I think there definitely is interest to figuring out a holistic story here. From the last comments on https://github.com/pydata/xarray/pull/7019 it seems like explicit chunking isn't needed for Ramba and Arkouda. A similar discussion came up in https://github.com/data-apis/consortium-feedback/issues/7, where the Dask model appears to be too restrictive for cuNumeric.

It's also worth considering whether a design (whatever that ends up looking like) should distinguish between single-node parallelism and distributed execution. This issue is about the former. I believe the distinction is important. Most array libraries (including NumPy) already deal with multi-threaded execution on a single node. And if that applies, it's inherently nicer than using a Dask-like model. Distributed execution is very different, and is completely out of scope for some libraries (NumPy, CuPy) and done as an add-on or again with a separate library for others (e.g., PyTorch offers torch.distributed, but there's also fancier separate options like Ray and Horovod).

leofang commented 1 year ago

@emcastillo had built basic support for dense/sparse CuPy arrays under cupyx.distributed, but IIRC it's targeting single-nodes only (to utilize the intra-node high-speed interconnect, i.e. nvlinks).

TomNicholas commented 1 year ago

It's also worth considering whether a design (whatever that ends up looking like) should distinguish between single-node parallelism and distributed execution. This issue is about the former. I believe the distinction is important.

Ah okay! Makes sense.

From xarray's perspective if all the parallelism happens behind a fully numpy-like API then we don't have to do anything else in order to benefit (e.g. we can already wrap CuPy arrays). Standardising dask and other libraries like cubed which copy dask's API is more complicated.

Anyway I just wanted to make everyone aware of the use-case of xarray and the work that's going on there.