adamkglaser / io_benchmarks

I/O benchmarks for sCMOS camera data saving
5 stars 0 forks source link

N5 multiple threads bug #1

Open adamkglaser opened 3 years ago

adamkglaser commented 3 years ago

@joshmoore

Using Zarr + Dask, when saving to N5, multi-threading only works when a compressor is used. When saving as raw with compressor = None, the operation runs single threaded. When using Zarr + Dask and saving to raw zarr format with compressor = None, the operation runs multi-threaded. Is there a potential bug when saving to raw N5 that disables multi-threading?

Python code

import zarr import numpy as np import dask.array as da

data = np.random.randint(0, 2000, size = [512,2048,2048]).astype('uint16') darray = da.from_array(data, chunks = (16,256,256))

no compression to n5 - no multi-threading bug (?)

compressor = None store = zarr.N5Store('test1.n5') darray.to_zarr(store, compressor = compressor)

with compression to n5 - multi-threading works

compressor = GZip(level = 2) store = zarr.N5Store('test2.n5') darray.to_zarr(store, compressor = compressor)

no compression to zarr - multi-threading works

compressor = None store = zarr.DirectoryStore('test3') darray.to_zarr(store, compressor = compressor)

Version and installation information

Zarr 2.6.1 Dask 2020.12.0 Python 3.9.1 Windows 10 Zarr installed via Conda

joshmoore commented 3 years ago

Thanks, @adamkglaser.

Had some time during a call to try some runs. Pasting without comment in case anyone wants to play along:

--------------------------------------------------------------------------------------- benchmark: 9 tests --------------------------------------------------------------------------------------
Name (time in s)                       Min                Max               Mean            StdDev             Median               IQR            Outliers     OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test[single-threaded-zarr_raw]      1.0733 (1.0)       1.1521 (1.0)       1.1045 (1.0)      0.0303 (1.0)       1.0926 (1.0)      0.0370 (1.0)           2;0  0.9054 (1.0)           5           1
test[threads-zarr_raw]              1.5753 (1.47)      1.7924 (1.56)      1.6716 (1.51)     0.0961 (3.18)      1.6799 (1.54)     0.1730 (4.67)          2;0  0.5982 (0.66)          5           1
test[threads-n5_raw]                2.2400 (2.09)      2.4299 (2.11)      2.3304 (2.11)     0.0780 (2.58)      2.3507 (2.15)     0.1255 (3.39)          2;0  0.4291 (0.47)          5           1
test[single-threaded-n5_raw]        2.2436 (2.09)      2.3941 (2.08)      2.3460 (2.12)     0.0648 (2.14)      2.3811 (2.18)     0.0917 (2.48)          1;0  0.4263 (0.47)          5           1
test[threads-n5_codec]              9.0364 (8.42)     20.0670 (17.42)    14.6527 (13.27)    4.7165 (155.89)   14.4705 (13.24)    8.3718 (226.24)        2;0  0.0682 (0.08)          5           1
test[processes-n5_raw]             13.1989 (12.30)    16.6437 (14.45)    14.7737 (13.38)    1.3594 (44.93)    14.7128 (13.47)    2.1090 (57.00)         2;0  0.0677 (0.07)          5           1
test[processes-zarr_raw]           15.4315 (14.38)    17.7849 (15.44)    16.2179 (14.68)    1.0821 (35.77)    15.5072 (14.19)    1.7018 (45.99)         1;0  0.0617 (0.07)          5           1
test[processes-n5_codec]           23.8300 (22.20)    37.1464 (32.24)    31.4479 (28.47)    5.0386 (166.54)   32.6610 (29.89)    6.5680 (177.50)        2;0  0.0318 (0.04)          5           1
test[single-threaded-n5_codec]     39.5262 (36.83)    47.2780 (41.03)    43.0636 (38.99)    3.3474 (110.64)   41.5285 (38.01)    5.5847 (150.92)        2;0  0.0232 (0.03)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Parameterized test:

import zarr
import pytest
import numpy as np
import dask.array as da
from numcodecs import GZip
from shutil import rmtree

z = 128
xy = 2048
data = np.random.randint(0, 2000, size = [z,xy,xy]).astype('uint16')
darray = da.from_array(data, chunks = (16,256,256))

def run(store, compressor, scheduler):
    x = darray.to_zarr(store, compressor = compressor, compute=False)
    x.compute(scheduler=scheduler)
    rmtree("t")

# no compression to n5 - no multi-threading bug (?)
def n5_raw(scheduler):
    compressor = None
    store = zarr.N5Store('t/test1.n5')
    run(store, compressor, scheduler)

# with compression to n5 - multi-threading works
def n5_codec(scheduler):
    compressor = GZip(level = 2)
    store = zarr.N5Store('t/test2.n5')
    run(store, compressor, scheduler)

# no compression to zarr - multi-threading works
def zarr_raw(scheduler):
    compressor = None
    store = zarr.DirectoryStore('t/test3.zarr')
    run(store, compressor, scheduler)

@pytest.mark.parametrize("method", (n5_raw, n5_codec, zarr_raw))
@pytest.mark.parametrize("scheduler", ("single-threaded", "threads", "processes"))
def test(benchmark, method, scheduler):
    benchmark(method, scheduler)

uses pytest-benchmark

adamkglaser commented 3 years ago

Thanks @joshmoore! Will try running this locally on my machine. I have not yet tried using the dask scheduler. I can't tell from your results, did you observe the same behavior as me? That N5 does not parallel write when raw is selected, but does when you add a compression algorithm? Whereas Zarr runs in parallel regardless? Also in run() it looks like it is hardcoded with 'single-threaded'. Perhaps that is a typo and explains some of the run times you shared? Thanks!

adamkglaser commented 3 years ago

Also instead of going through dask, do you have any thoughts / have you tried just parallelizing Zarr itself? See code below. This is advantageous as it allows a simple way to do on-the-fly writing, whereas it seems much more complicated and roundabout to achieve that with Dask. The one caveat is that I believe you should on-the-fly write with a block size that is an integer multiple of the file chunk size.

import os
import threading
import multiprocessing
import zarr
import numpy as np
import z5py
import h5py
import tifffile
import dask.array as da
import xarray as xr
import time as timer
from numcodecs import GZip, BZ2
from shutil import rmtree

def write_data(idx, zdata, data, blockSize):    
    zdata[idx:idx+blockSize] = data[idx:idx+blockSize]

def write_zarr(data, chunks, compressor, parallel, mode):
    if mode == 'n5':
        store = zarr.N5Store('C:\\benchmark\\zarr.n5')
    else:
        store = zarr.DirectoryStore('C:\\benchmark\\zarr.zr')
    blockSize = 2*chunks[0]
    zdata = zarr.zeros(store = store, overwrite = True, shape = data.shape, chunks = chunks, dtype = data.dtype, compressor = compressor)
    if parallel == True:
        threads = []
        for idx in np.arange(0, data.shape[0], blockSize):
            thread = threading.Thread(target = write_data, args=(idx, zdata, data, blockSize))
            thread.start()
            threads.append(thread)
        for t in threads:
            t.join()                                   
    else:                                    
        zdata[:] = data[:]

if __name__ == '__main__':

    cores = multiprocessing.cpu_count()

    chunks = (32, 256, 256)
    data = np.random.randint(0, 2000, size = [cores*chunks[0],2048,2048]).astype('uint16')

    write_zarr(data, chunks = chunks, compressor = None, parallel = True, mode = 'zarr')
    write_zarr(data, chunks = chunks, compressor = None, parallel = True, mode = 'n5')
joshmoore commented 3 years ago

Also in run() it looks like it is hardcoded with 'single-threaded'. Perhaps that is a typo and explains some of the run times you shared?

Definitely. That's what I get for coding while on a call. I've fixed the bug, re-increased the tile size, and updated the results.

I can't tell from your results, did you observe the same behavior as me?

Looking at the new results, if we ignore processes which are slower across the board (except the odd single-thread n5 codec outlier), it looks like I'm still getting raw faster than codecs, but it also doesn't look like I'm getting a lot of parallelism in general.

have you tried just parallelizing Zarr itself?

I have not.

joshmoore commented 3 years ago

At least in combination with the dask-dashboard,

    x = darray.to_zarr(store, compressor = compressor, compute=False)
    x.compute(scheduler=scheduler)

is not doing what I would expect. If I move to

x = darray.to_zarr(store, compressor = compressor, scheduler=scheduler)

with using a manually created 4-worker dask cluster, I'm seeing parallel writes for all three of the test cases here.

adamkglaser commented 3 years ago

Interesting this is helpful information! Could you share the code for the last manual case?

joshmoore commented 3 years ago

Sure. Sorry, should have done that before heading offline yesterday. It's not significantly different from the above:

import zarr
import pytest
import numpy as np
import dask.array as da
from numcodecs import GZip
from shutil import rmtree
from distributed.utils_test import client
from dask.distributed import Client, progress

z = 1
xy = 1024
data = np.random.randint(0, 2000, size = [z,xy,xy]).astype('uint16')
darray = da.from_array(data, chunks = (1,256, 256))

def run(store, compressor, scheduler):
    from dask.distributed import performance_report
    with performance_report(filename="dask-report.html"):
        x = darray.to_zarr(store, compressor = compressor, scheduler=scheduler)

# no compression to n5 - no multi-threading bug (?)
def n5_raw(scheduler):
    compressor = None
    store = zarr.N5Store('t/test1.n5')
    run(store, compressor, scheduler)
    rmtree("t")

# with compression to n5 - multi-threading works
def n5_codec(scheduler):
    compressor = GZip(level = 2)
    store = zarr.N5Store('t/test2.n5')
    run(store, compressor, scheduler)

# no compression to zarr - multi-threading works
def zarr_raw(scheduler):
    compressor = None
    store = zarr.DirectoryStore('t/test3.zarr')
    run(store, compressor, scheduler)

client = Client("127.0.0.1:8786")
zarr_raw("threads")

i.e. basically using it as a library. I started dask-scheduler in one terminal and then dask-worker 127.0.0.1:8786 on another 4 and seemed to get sensible parallelism.

adamkglaser commented 3 years ago

Thanks, I will try this to see if I can replicate on my end. But to confirm, using vanilla zarr without the scheduler etc. you are also single non-parallel writes to N5?

joshmoore commented 3 years ago

My understanding is:

adamkglaser commented 3 years ago

Gotcha. Ok yes this matches my observations. I will try your distributed Dask example over the weekend. This sheds some light on where the issue might be (and sounds like provides a solution), but the one issue for me is that it is not straightforward to do partial writing to a store with zarr + dask (as far as I can tell?). Ultimately, my goal is to save data off of our camera in an on the fly fashion, for example every 512 frames - when the total array size may be tens of thousands of frames. Because of this, I put together the code to manually implement multi-threading here https://github.com/adamkglaser/io_benchmarks/issues/1#issuecomment-770262888.

However it is strange because I see the same issue as with Dask, things for N5 surprisingly don't look parallel (but they do for saving to Zarr). Sorry for so many follow-up questions, but...

Thanks and have a nice weekend! Adam

joshmoore commented 3 years ago

Hi @adamkglaser. My apologies but this fell off my radar. Have you had any luck or insights on your side? ~Josh

adamkglaser commented 3 years ago

No worries, I have actually been working out the logistics of my next position and this fell of my radar as well. But this is ability of writing directly to N5 via python (with high speed) is actually going to be critical to my future projects. I'll be joining the Allen Institute and establishing a few new large-scale imaging pipelines. Streaming data from the cameras to N5 instead of TIFF or H5 will be very important.

To pick this back up, did you arrive at a method for using Zarr (on Windows) to do fast parallel writing to N5 with no compression?

Thanks! Adam

On Thu, May 13, 2021 at 3:53 AM Josh Moore @.***> wrote:

Hi @adamkglaser https://github.com/adamkglaser. My apologies but this fell off my radar. Have you had any luck or insights on your side? ~Josh

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/adamkglaser/io_benchmarks/issues/1#issuecomment-840480350, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADMGOKMS75NIOPRL3YIOWZLTNOVTTANCNFSM4WWKQFPQ .

-- Adam Glaser, Ph.D. Acting Instructor University of Washington Email: @.*** Office: (206) 543-6226 Cell: (401) 787-7469