fsspec / filesystem_spec

A specification that python filesystems should adhere to.
BSD 3-Clause "New" or "Revised" License
1.02k stars 358 forks source link

Poor performance of async IO in a multithreaded env #488

Open ravwojdyla opened 3 years ago

ravwojdyla commented 3 years ago

Thanks for the great work! I experience poor performance of IO bound tasks in a multithreaded environment.

Dask based example:

import fsspec

import dask.array as da
import xarray as xr
from dask.distributed import Client

def test(path):
    fs_map = fsspec.get_mapper(path)

    ar_dsk = da.random.random(size=(200_000, 200_000), chunks=(1_000, 1_000))
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))
    ar_xr.to_zarr(fs_map, mode="w")

if __name__ == "__main__":
    # this saturates just one core ("IO loop")
    c = Client(processes=False)
    test("gs://foo/bar)

afaiu the current asyc design, some methods are coroutines. Let's take cat for example, _cat is the underlying coroutine, there might be multiple paths to cat, and those will happen concurrently. There is a single thread/loop per AsyncFileSystem (let's call it IO loop loop thread + default executor), that runs all coroutines per an instance of AsyncFileSystem (since GCS instances are cached that's usually more than a single instance). If a regular thread calls fs.cat, that runs maybe_sync, which effectively hands over the _cat to the IO loop and the thread waits on the results in the sync method. This works okish in a single threaded environment, not so much in a multithreaded environment like Dask, where it could oversaturate the "IO loop", effectively single threading the IO (with slight improvement given the async logic).

The issue seems to be that in the code above dask is using threadpool for tasks (in my case 16 threads), each task is mostly IO bound, so most of the work is being actually done by a single fsspec's "IO loop", whilst most of the worker threads just wait. The CPU saturation is very poor.

To validate this theory, if I use a process pool for dask cluster (each process with 1 worker thread) (via say: Client(processes=True, threads_per_worker=1)), the CPU saturation is much better since each worker process/thread has its own "IO loop" (separate processes and separate instances of FS).

This example issue should be reproducible with multprocessing as well.

Thinking about solutions: loop thread could hand over the IO work to executor (via run_in_executor), or worker threads should handle their own loop (if possible). Please let know what you think.

See an example of partial stacktrace dump:

Thread 6672 (active): "Thread-1"
    loads (json/__init__.py:343)
    _call (gcsfs/core.py:503)
    _cat_file (gcsfs/core.py:826)
    _run (asyncio/events.py:81)
    _run_once (asyncio/base_events.py:1859)
    run_forever (asyncio/base_events.py:570)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)
Thread 6343 (idle): "ThreadPoolExecutor-0_0"
    _worker (concurrent/futures/thread.py:78)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)
Thread 6350 (idle): "Dask-Worker-Threads'-6291-0"
    wait (threading.py:306)
    wait (threading.py:558)
    sync (fsspec/asyn.py:68)
    maybe_sync (fsspec/asyn.py:100)
    cat (fsspec/asyn.py:226)
    getitems (fsspec/mapping.py:89)
    _chunk_getitems (zarr/core.py:1666)
    _get_selection (zarr/core.py:1033)
    _get_basic_selection_nd (zarr/core.py:739)
    get_basic_selection (zarr/core.py:696)
    __getitem__ (zarr/core.py:571)
    __getitem__ (xarray/backends/zarr.py:56)
    __array__ (xarray/core/indexing.py:560)
    asarray (numpy/core/_asarray.py:83)
    __array__ (xarray/core/indexing.py:495)
    asarray (numpy/core/_asarray.py:83)
    getter (dask/array/core.py:102)
    apply_function (distributed/worker.py:3411)
    run (distributed/_concurrent_futures_thread.py:65)
    _worker (distributed/threadpoolexecutor.py:55)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)
martindurant commented 3 years ago

I can think of a few ways to handle this, but I am surprised that this doesn't seem to be a problem elsewhere. To be sure: you are right, and threads will be calling the same instance of the file-system class, sharing any connection pool and ioloop.

However, the point is, that IO operations involve waiting or reading from a socket, and the overhead for running tasks on an ioloop is very small. Only one thing can be reading from the network at a time, after all.

For example, the dask worker allocates threads for CPU-bound work, but network operations (talking to the scheduler, etc) happen on a single ioloop.

process pool for dask cluster, the CPU saturation is much better

so what are the processes doing? Are you sure you didn't just increase the amount of CPU work to do per task for copies and serialisation, without actually improving the total data throughput?

@mrocklin and maybe @quasiben would have intuition here

ravwojdyla commented 3 years ago

@martindurant

and the overhead for running tasks on an ioloop is very small.

Looking at the stacktrace of the "loop thread":

Thread 6672 (active): "Thread-1"
    loads (json/__init__.py:343)
    _call (gcsfs/core.py:503)
    _cat_file (gcsfs/core.py:826)
    _run (asyncio/events.py:81)
    _run_once (asyncio/base_events.py:1859)
    run_forever (asyncio/base_events.py:570)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)

You can see a random snapshot of the "loop thread", occupied with reading, parsing etc data from all 16 dask threads that just wait for return from cat. If all 16 dask threads are IO bound, they will just hammer the single "loop thread" with work.

so what are the processes doing? Are you sure you didn't just increase the amount of CPU work to do per task for copies and serialisation, without actually improving the total data throughput?

It's the same code, just different pool thread vs process. In the thread pool, I believe dask threads oversaturate the single "loop thread" with work, and mostly just wait. In process pool, each worker has its own "loop thread", thus can't easily oversaturate it. Does this make sense?

Edit: GIL isn't an issue in process pool, that might have an impact too.

martindurant commented 3 years ago

It's the same code, just different pool thread vs process. In the thread pool, I believe dask threads oversaturate the single "loop thread" with work, and mostly just wait. In process pool, each worker has its own "loop thread", thus can't easily oversaturate it. Does this make sense?

It does not make sense in my mind for io-bound work, which is what the loop thread is for. It is OK that the other threads are waiting, they would be waiting regardless.

So, is your process-based workflow actually getting through work faster? The dask dashboard would be able to tell you what things the (apparently extra) CPU time is being spent on.

ravwojdyla commented 3 years ago

See reports for the same logic to see the difference:

I have also separately measured the GIL on thread based one, and there is somewhat significant GIL hold on the IO loop thread (which makes sense). Closing this issue since I can use process based cluster, and some part of it can definitely by due to GIL. Thanks.

martindurant commented 3 years ago

Let's keep this open, I may have time to look into it - and thank you for the profile report. What versions of zarr, xarray, fsspec, gcsfs and dask/distributed are you using? Is the data public, so that I could run the exact same tests?

ravwojdyla commented 3 years ago

@martindurant versions:

xarray==0.16.1
zarr==2.5.0
fsspec==0.8.4
gcsfs==0.7.1
dask==2.30.0
distributed==2.30.0

The test data used was a randomly generated float array via da.random.random.

martindurant commented 3 years ago

Oh, in that case I would appreciate if you just posted the whole code

ravwojdyla commented 3 years ago

@martindurant

def write(path):
    fs_map = fsspec.get_mapper(path)

    ar_dsk = da.random.random(size=(100_000, 100_000), chunks=(1_000, 1_000))
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))
    ar_xr.to_zarr(fs_map, mode="w")

def read(path):
    fs_map = fsspec.get_mapper(path)

    ar_xr = xr.open_zarr(fs_map) 
    with performance_report(filename="dask-report.html"):
        r = (ar_xr.foo * 2.0).persist()
        progress(r)
martindurant commented 3 years ago

Would you like to run your benchmarks with the latest versions?

tasansal commented 10 months ago

We always run into this and try to circumvent it by using processes (even with the latest gcsfs).

However, it would be nice if it got fixed, processes are too heavy sometimes, and with PyTorch's DataLoaders we try to use multiple threads (don't want to create processes from processes) to read concurrently and hit this bottleneck.

@martindurant I can also generate some simpler benchmarks if it helps without xarray and with directly zarr etc.