dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

RFC: explicit shared memory #4497

Open martindurant opened 3 years ago

martindurant commented 3 years ago

With the increasing availability of large machines, it seems to be the case that more workloads are being run as many processes on a single node. In a workflow where a single large array would be passed to workers, currently this might be done by passing the array from the client (bad), using scatter (OK) or loading data in the workers (good, but not efficient if we want one big array).

A large memory and transfer cost might be saved by putting the array into posix shared memory and referencing it from the workers. If we host the array is in shm, there is no copy or de/ser cost (but there is an OS call cost to attach to the shm). It could be appropriate for ML workflows where every task wants to make use of the whole of a large dataset (as opposed to chunking the dataset as dask.array operations do). sklearn with joblib is an example where we explicitly recommend scattering large data.

As a really simple example, see my gist, in which the user has to explicitly wrap a numpy array in the client, and then dask workers no longer need to have their own copies. Note that SharedArray is just a simple way to pass the array metadata as well as its buffer; it works for py37 and probably earlier.

To be clear: there is no suggestion of adding anything to the existing distributed serialisation code, because it's really hard to try to guess when a user might want to use such a thing. It should be explicitly opt-in.

Further,

cc @crusaderky @quasiben @jsignell

quasiben commented 3 years ago

It's also worth noting that UCX / UCX-Py can communicate with Shared Memory including: CMA, knem, xpmem, SysV, mmap. @pentschev and I did a bit of experimenting last summer: https://github.com/openucx/ucx/issues/5322

jakirkham commented 3 years ago

Yeah there are a few of these shared array libraries that have floated around. For example, Sturla Molden built one, SharedArray (as used here), and a few others that are escaping me atm

joblib itself uses a ramdisk for shared memory. Though this is only on Linux. We discussed how this could be extended to other OSes in issue ( https://github.com/joblib/joblib/issues/705 ). There's an old gist with some of this work.

More generally memory mapping may make sense as the mapped memory actually lives in shared memory. So would be accessible from other processes. Ofc would want to go back through and check how OSes support this.

As of Python 3.8, Multiprocessing has its own shared_memory module. IIRC this evolved out of posix_ipc and sysv_ipc libraries (maybe from here?).

On older Python versions, one might use RawArray. Have played with that strategy myself here.

applio commented 3 years ago

I think this already works with multiprocessing.shared_memory.SharedMemory -- see the 2nd code example in the docs where a NumPy array is backed by shared memory: https://docs.python.org/3/library/multiprocessing.shared_memory.html

The implementation behind multiprocessing.shared_memory is POSIX shared memory on all systems where that's available and Named Shared Memory on Windows. This makes for a cross-platform API for shared memory that's tested and viable on quite a variety of different OSes and platform types.

Similar techniques could be used to wrap arrow or pandas data, although no one probably wants to delve through existing in-memory objects to find the underlying buffers.

A more general tool to wrap pandas and pandas-like objects was developed prior to the release of Python 3.8 (and the shared memory constructs in the Python Standard Library) but was not suitable for inclusion in the core because it was not general purpose enough even if it was useful.

jakirkham commented 3 years ago

cc @maartenbreddels (who may be interested in this or have thoughts here)

martindurant commented 3 years ago

To be clear: this isn't really about the implementation (there are lots of ways to do it), but the concept. Is this something that would really be useful, and should someone/one of us spend some time making it happen?

(I think all of the tools, including py>3.8's, end up reserving shm with essentially the same OS call)

applio commented 3 years ago

appropriate for ML workflows where every task wants to make use of the whole of a large dataset

This has been a recurring need in my projects where the entirety of the data needs to be accessible to all workers yet duplicating the data for each worker would exceed available system memory. I have primarily used NumPy arrays and pandas DataFrames backed by shared memory thus far -- it would be cool to use dask as part of this. My situation might only qualify as a single data point but I am not the only weirdo with this sort of need.

It should be explicitly opt-in.

+1 on this as well.

crusaderky commented 3 years ago

Do I understand correctly that multiprocessing.shared_memory survives indefinitely after the death of all processes that reference it? e.g. it's like writing to /dev/shm. Any idea on how to prevent memory leaks when a worker crashes or is SIGKILL'ed for any reason?

martindurant commented 3 years ago

Yes, I believe that can be a problem @crusaderky . If it's explicitly opt-in, then we can require the user to manually delete blocks should this happen. Note that on macos, shm is still a thing, but I don't think you can access it via the file system.

jakirkham commented 3 years ago

Or you use SharedMemoryManager. IIUC that exists to enable management of these blocks. Though Davin is here and likely can provide more guidance

All OSes have some form of shared memory. The multiprocessing module is derived from POSIX shared memory primitives (though there is an older SYS V set as well). These also exist in FreeBSD, which macOS derives from. Ofc this also exists in Linux as well. Though Windows does not have these POSIX primitives, it does have other primitives that this module uses. So it should be possible to use it on all major platforms.

applio commented 3 years ago

@jakirkham's suggestion to use a SharedMemoryManager is a great one because its primary purpose is to help ensure that shared memory gets free'd and released.

By using a manager to create shared memory (via SharedMemoryManager.SharedMemory(size)) the shared memory segment will be destroyed when the SharedMemoryManager object passes out of scope or its process exits. It is arguably a best practice to use a SharedMemoryManager whenever possible but there are also plenty of valid use cases where a single process can not be expected to be solely responsible for the lifecycle of the shared memory, hence the use of the manager is optional.

To @crusaderky's original question: independent of the use of a manager, shared memory created through multiprocessing.shared_memory is not guaranteed to survive indefinitely. On Windows, it can only live as long as the last process that ever had a handle on it before it is involuntarily destroyed by the Windows kernel. On other systems with POSIX shared memory implementations, not all OSes implement the full POSIX specification and so there can be no guarantees. On Linux, the POSIX implementation is sufficiently complete that it can outlive the last process -- the often-claimed generalization that "everything's a file on Linux" demonstrates itself in a neat way: each new shared memory allocation from multiprocessing.shared_memory can be seen as a new file on /dev/shm (on Linux systems, not all Un*x systems).

maartenbreddels commented 3 years ago

As of Python 3.8, Multiprocessing has its own shared_memory module. IIRC this evolved out of posix_ipc and sysv_ipc libraries (maybe from here?).

On older Python versions, one might use RawArray. Have played with that strategy myself here.

Maybe this changed in 3.8, but IIRC the child processes would only see shared memory created in the parent process, and only the memory created before they were forked/cloned. Maybe this changed, because there are posix API's that should allow for sharing new memory.

In vaex, I mainly use memory mapping, and if I pickle a 'dataset' (the data backing the dataframe), I just pickle the path and offsets (and other metadata), and the child processes will thus share the same physical memory. However, for data created on the fly, it would be great if this can go over shared memory.

jakirkham commented 3 years ago

For RawArray (and other related APIs) this is true. More specifically this uses POSIX anonymous shared memory on OSes supporting POSIX, which has these characteristics.

The new shared_memory module uses named shared memory, which can be accessed by other processes as long as they have the name.

Right memory mapped files was the other idea mentioned above. This also uses shared memory for where data from the file is read & written to. So effectively has similar behavior as named shared memory except there is also a file involved.

Zaharid commented 3 years ago

Implementing something like this on top of e.g. the nanny process, so as to make it responsible for holding results would not only provide have the efficiency gains discussed here, but also make the cluster much more resistant to malfunctioning of the worker processes (such as for example #4345). Workers could then easily restart without the need of destroying (or even transporting) the data they hold.

martindurant commented 3 years ago

@Zaharid , that's an interesting idea that I hadn't considered. You could ensure that only real results are kept, but the accumulated memory leaks are cleared. From the point of view of my original suggestion, though, it would involve having to communicate with the nanny, instead of the simple deserialisation in the worker that I have proposed.

To everyone: this is clearly an interesting topic and we have many ideas. How do we make progress here?

jakirkham commented 3 years ago

Yeah I guess we need to determine what the use cases are. Thus far we have brought up:

  1. Working with large data shared across workers
  2. Intranode communication
  3. Data resiliency to worker crashes

Anything else?


At least for 1 + 3 mmaped files sound attractive. For 1 this is useful because the data we are working with needs to be loaded somehow and mmaped files takes advantage of shared memory already. For 3 mmaped files provide the option to not only share data between a nanny and a worker to protect against worker failures, but with a file they can potentially protect against nanny failures as well. Technically a mmaped file should work for 2, but having a file involved in communication is probably unnecessary. Maybe it could make sense if we consider mmaping in the context of spilling, in which case having that data in a mmaped file makes it easy to share with workers on the same node.

martindurant commented 3 years ago

@jakirkham , would you mind sketching out how you think a workflow would go with memmapped files? For data that doesn't happen to come from an npy file, it would mean first writing the data to disk; or do you mean mmap.mmap and copying bytes from the buffer?

jakirkham commented 3 years ago

mmap.mmap would be fine. Sorry I don’t follow what is meant by copying bytes from a buffer

martindurant commented 3 years ago

I mean, you could adapt my little bit of code, but instead of copying the original array to shm, you copy it to the mmap, right? That is, unless it happens to be in a file already. So the mmap would not normally be file backed, and any process with the handle could read it without further copies.

jakirkham commented 3 years ago

Ah sorry yeah that makes sense. Agreed

Zaharid commented 3 years ago

It might be worth mentioning that there is also the Arrow Plasma store http://arrow.apache.org/docs/python/plasma.html, which is a somewhat higher level interface, in that it for example reference counts the pointers to allocated memory regions.

martindurant commented 3 years ago

I updated the gist at https://gist.github.com/martindurant/5f517ec55a5bff9c32637e8ebc57ef7c with a naive mmap implementation along the same very opt-in model. It performs the same as shm for reading, but takes longer to write, because there is a real file somewhere. Presumably that write could be to a file in RAM somehow.

jakirkham commented 3 years ago

Yeah on Linux one can use /dev/shm and then that's just in-memory storage. On macOS one would need to create a RAM disk, but then this could be used. Windows has something similar.

That said, I wonder whether just targeting a solid state drive is sufficient for most use cases.

fjetter commented 3 years ago

Just a note about a few issues I found which directly or loosely connect to this

martindurant commented 3 years ago

@florian-jetter-by : I think the latter only, and indeed it's that kind of conversation that I was thinking about. My example just does a little, but potentially useful, part of that, without a need to change any comm/protocol.