dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Spill to disk is inefficient for small byte-values #525

Open mrocklin opened 8 years ago

mrocklin commented 8 years ago

As of #485 workers can spill excess data to disk. They currently evict tasks from a fixed pool of memory to disk with a simple LRU policy. Task results are stored on disk as single files. This is currently done using the composable MutableMappings in a tiny project, zict.

if memory_limit:
    from zict import Func, Buffer, File
    path = os.path.join(self.local_dir, 'storage')
    memory = dict()
    disk = File(path)
    disk = Func(dumps_to_disk, loads_from_disk, disk)
    self.data = Buffer(memory, disk, int(float(memory_limit)), weight)  # LRU from memory to disk
else:
    self.data = {}

This policy is decent for large results (larger than say, a megabyte) but will slow down when trying to save many small results due to file system overhead. This case specifically occurs when performing shuffles, such as with dask.array rechunkings, or dask.bag/dataframe joins, where we have possibly millions of very small results. Saving thousands of tiny files is very slow on most file systems. See blogpost.

Single-machine key-value store databases provide decently efficient random write and read access to small values on disk. There are many solutions here. I'm somewhat partial to embedded databases like LevelDB (which now has many more modern competitors) that run inside the same process because they don't require significant setup, run in a well contained way, and are easy to use.

So one solution here would be to optionally replace the zict.File(MutableMapping) that uses the filesystem with a MutableMapping that was backed by some more efficient database. This would require learning a bit about different key-value stores and choosing a good one based on the many-small writes/reads workloads that dask.distributed might push onto it, finding a way to combine it with the LRU scheme (or some other scheme) used currently (this is easy if you can make it follow a MutableMapping interface), and then hooking up the necessary keywords to allow users to access this option easily.

Key-value stores may be bad for large-byte values. We might want a three-MutableMapping solution. One for in memory, one for large-on-disk (zict.File) and one for small-on-disk (some on-disk database).

Some considerations:

  1. We need to be able to specify/restrict the amount of memory that the database uses internally
  2. I would love to see more MutableMapping added to Zict, which I should move to the dask org if anything like this happens.
  3. ...
mrocklin commented 8 years ago

An alternative to this would be to build a specialized shuffle into dask.distributed. If we want to make dask.dataframe joins as fast as what you find in a database then something like this probably necessary. The drawback is that the specialized shuffle would be entirely separate from the task scheduling system. This is how databases/Spark do it. Historically Dask has always been "just a task scheduler" eases maintenance and keeps all of our optimizations (like the optimization above) useful in other situations than just database cases.

However there is always a choice. Dask could pivot and jump into the database game. I'm personally not excited about this (there are many excellent databases and few excellent task schedulers) but other people might feel differently. Building a shuffle is still an option. It might even be doable without much pain.

mrocklin commented 8 years ago

I've heard good things about rocksdb from a couple of sources now.

There appears to be an ubuntu package and a decent python library

kszucs commented 8 years ago

I think dask would gain a lot of popularity as a "drop-in-replacement" for pyspark dataframes. A lot of people uses pyspark for biggish problems just because there are no alternatives in the python ecosystem. They need to deal a lot of overhead with learning/experimenting/digging/configuring pyspark, jvm, OOMs etc (it's a nightmare - especially after using pandas). I want no more jars, class names taking miles to write, hundreds of configuration options. I just want to use pandas interface on distributed data, efficiently. Dask distributed is really close, in fact I'd threw out all of our problematic pyspark jobs if there were a fast enough shuffle mechanism in distributed.

Historically Dask has always been "just a task scheduler" eases maintenance and keeps all of our optimizations (like the optimization above) useful in other situations than just database cases.

Historically Spark was a proof-of-concept framework for mesos :)

I can see dask as the de facto standard distributed solution for python, since it already has bags, arrays, dataframes. I think these existing containers implies that dask is not just a task scheduler.

On the other hand, doesn't disk shuffle on disk (via partd) violate this policy?

I'd really love to see both, a more efficient spill-to-disk and a more efficient distributed shuffle in the near future.

mrocklin commented 8 years ago

Yeah, to be clear I'd like to restrict this issue is about the specific case of spill-to-disk-workers. I'd like to focus the spark conversation elsewhere. I apologize for adding the shuffle note above.

If you want to start the shuffle work please be my guest. I agree that it would have high value. It also has high development and maintenance cost. It is nowhere on my personal roadmap for the project. Would love to have active contributors work on this independently.

pitrou commented 8 years ago

There's a simple benchmark of key/value stores written in Python there: http://charlesleifer.com/blog/completely-un-scientific-benchmarks-of-some-embedded-databases-with-python/

I'll try to reproduce, and see whether the benchmark makes sense. It's interesting that the venerable gdbm appears the fastest of the bunch, at least in this specific test.

mrocklin commented 8 years ago

I spent some plane ride a while ago starting on a RocksDB MutableMapping here: https://github.com/mrocklin/zict/pull/2

Not particularly mature but might be useful for a start

pitrou commented 8 years ago

Another contestant: https://lsm-db.readthedocs.io/en/latest/index.html Another source of information about embedded databases: http://engine.so/

jcrist commented 8 years ago

I've also heard good things about lmdb. Python bindings: https://lmdb.readthedocs.io/en/release/. From reading the docs it may not be appropriate for this use case though. It seems to be optimized for reading over writing, and small values over large. Benchmark: http://104.237.133.194/bench/ondisk/.

pitrou commented 8 years ago

Ok, here is a benchmark script of various key-value stores: https://gist.github.com/pitrou/9c9734dc266e3dcb2c6383102245045c

It uses fixed-length keys and variable-length values. The length of values is distributed according to a lognormal distribution. You can of course tune the parameters in the script. Both sequential (sorted) and random access is benchmarked. The write workload only writes new values, not existing ones.

Here are the results on my machine (64-bit Linux, SSD): https://gist.github.com/pitrou/beb124b8626fa390d5d26a5b0f6d8d5d

All tested stores look convincing: a couple µs per operation means the overhead is small compared to regular Python bytecode execution. Kyoto Cabinet is practically unusable for us as it's GPL-licensed (a pity since it performs rather well). LMDB appears the best of the bunch on this particular benchmark. RocksDB is surprisingly a bit slower than LevelDB.

Some stores I haven't been able to exercise: gdbm, ndbm and bsddb are not packaged with Anaconda Pythons. The Python wrappers for unqlite and vedis seem buggy when keys or values contain null bytes, which probably makes them a no-go.

It would be nice to know if others get different results. You may have to pip install some libraries...

pitrou commented 8 years ago

I got similar results for LMDB on a 64-bit Windows VM. The other libraries I wasn't able to install on Windows...

pitrou commented 8 years ago

A quick test shows that LMDB is also very fast with large (multi-MB) values, actually as fast as directly writing files, achieving more than 1 GB/s. LevelDB and RocksDB are a lot slower at writes.

pitrou commented 8 years ago

Note the LMDB figures were obtained with writemap=True, which is apparently unadvised on some systems ("This option may cause filesystems that don’t support sparse files, such as OSX, to immediately preallocate map_size= bytes of underlying storage").

mrocklin commented 8 years ago

Looks like LMDB is already on conda-forge: https://github.com/conda-forge/lmdb-feedstock

On Tue, Oct 25, 2016 at 7:30 AM, Antoine Pitrou notifications@github.com wrote:

A quick test shows that LMDB is also very fast with large (multi-MB) values, actually as fast as directly writing files, achieving more than 1 GB/s.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/525#issuecomment-256009784, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDJKFrZGmM-ZlC0rfwjs0gzLpqclks5q3ehrgaJpZM4J98B5 .

pitrou commented 8 years ago

This is the LMDB shared library, not the Python package.

mrocklin commented 8 years ago

Hrm, that comment was actually written and sent several hours ago, before you authored the python-lzma staged recipe. Wasn't intended to dissuade you from https://github.com/conda-forge/staged-recipes/pull/1809

mrocklin commented 8 years ago

Do these change significantly when the byte values are in the 100MB range? This range is as important as the the small-byte range.

On Tue, Oct 25, 2016 at 7:08 AM, Antoine Pitrou notifications@github.com wrote:

I got similar results for LMDB on a 64-bit Windows VM. The other libraries I wasn't able to install on Windows...

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/525#issuecomment-256005623, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszEkv6QPTFDOnG9FwRKA-R5wCeP4_ks5q3eMsgaJpZM4J98B5 .

mrocklin commented 8 years ago

For the GB/s write speeds, does your computer have NVMe? If not then we're probably seeing write-to-ram speeds rather than write-to-disk. Are you flushing at the end of the test?

On Tue, Oct 25, 2016 at 9:12 AM, Antoine Pitrou notifications@github.com wrote:

Note the LMDB figures were obtained with writemap=True, which is apparently unadvised on some systems ("This option may cause filesystems that don’t support sparse files, such as OSX, to immediately preallocate map_size= bytes of underlying storage").

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/525#issuecomment-256030300, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszEA_Vt1h5wRyfhixi99n6DttuVPcks5q3gAxgaJpZM4J98B5 .

pitrou commented 8 years ago

No, this is on a SATA SSD. I am not flushing at the end of the test: is it required for this workload?

mrocklin commented 8 years ago

It's not required, but it may change results. Writing to memory might have different performance characteristics than writing to disk. Flushing would give some indication if there was a significant difference.

pitrou commented 8 years ago

Ideally we would test under limited memory conditions, to let the database (or the OS) use its own flushing heuristics. Unfortunately I don't know how to limit the amount of physical RAM available to the benchmark process (ulimit -m doesn't seem to work), and a multi-GB test would be quite slow...

pitrou commented 8 years ago

That said, we can simply err on the side of safety and use the file backend for large data values (> 1MB perhaps).

mrocklin commented 8 years ago

There are some times when we need to write many small values. This happens during operations like row-major to column-major rechunkings in dask.array or joins and groupbys in dask.dataframe.

I don't think we need to spend too much more time benchmarking though. Anything will be a large improvement and it seems to be fairly easy to change this in the future.