scverse / anndata-tutorials

Use cases for anndata.
BSD 3-Clause "New" or "Revised" License
13 stars 12 forks source link

Concat on Disk Tutorial #18

Open selmanozleyen opened 1 year ago

selmanozleyen commented 1 year ago

Hi,

This is how I started the notebook. @ivirshup @ilan-gold

review-notebook-app[bot] commented 1 year ago

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

selmanozleyen commented 1 year ago

hi @ivirshup , I would say the notebook is ready however, I am planing to ask you if we should change the parameter to take a string for memory_limit as dask does. For example

concat_on_disk(infiles,outfile,...,max_loaded_elem=1_000_000)

instead of this we would have

concat_on_disk(infiles,outfile,...,sparse_mem_limit='600mb')

Another motivation for this is I wasn't comfortable with memory measurements at that time and the parameter was only for the theoretical number of elements (counts them to the limit even if they are zero) now I can measure the actual loaded elements and their size etc and thus can take a size like dask. I am writing this since this enhancement would also change the notebook.

selmanozleyen commented 1 year ago

Hi @flying-sheep , sorry for the delay, I was moving my development environment and MacOS feels strange atm.

I fixed the points you mentioned. However I think the numbers aren't correct. Specifically the Dask ones. I will have a look at them then ping you again.

flying-sheep commented 1 year ago

@selmanozleyen did you get a chance to look at them?

Regarding dask, you should probably use the dedicated memray integration it offers:

I asume for the most accurate results, we’d need to do

tracer_kwargs = dict(trace_python_allocators=True, native_traces=True, follow_fork=True)

if not is_sparse:
    with (
        memray_workers(OUTDIR, report_args=False, **tracer_kwargs),
        memray_scheduler(OUTDIR, report_args=False, **tracer_kwargs),
    ):
        concat_on_disk(**concat_kwargs)
else:
    with memray.Tracker(OUTDIR / "test-profile.memray", **tracer_kwargs):
        concat_on_disk(**concat_kwargs)

max_mem = 0
for stat_file in OUTDIR.glob("*.memray"):
    with memray.FileReader(stat_file) as reader:
        max_mem += reader.metadata.peak_memory

/edit: I run into two problems when I run the notebook on my macBook:

  1. datasets_aligned is empty. Any idea why?
  2. dataset_max_mem(max_arg=1_000_000, datasets=datasets_unaligned, array_type="sparse") runs for minutes (or forever), the other calls in <20s. Any idea why?
selmanozleyen commented 1 year ago

@flying-sheep , sorry when refactoring the code I saw the shape argument and assumed I was using the tuple I could've accessed through X.shape. Somehow this lead to very strange but still working behavior :D. It should be fixed by now.

Last time I came to the conclusion the numbers were somehow accurate but I will also apply your suggestion.

flying-sheep commented 1 year ago

Well, on my macBook, the code doesn’t seem to work quite as intended (see last paragraph of previous comment), otherwise I could finish it up.

Regarding the numbers, maybe we should explain them.

selmanozleyen commented 1 year ago

I checked the numbers and they are higher in reality. Thanks for the suggestion. I will update the notebook and change the numbers.

Well, on my macBook, the code doesn’t seem to work quite as intended (see last paragraph of previous comment), otherwise I could finish it up.

I see, but because of the shape problem all the datasets were treated as if they are aligned, I thought this lead to a performance degradation.

Update: When I lower the limit for dask my system crashes. It is very slow to debug with big files right now so I just commited the update.

selmanozleyen commented 1 year ago

Hi @flying-sheep,

Thanks a lot for the input. I see that I made a mistake by only changing max_loaded_elems on aligned dataset. On aligned dataset it isn't even used. I think it is very slow because

I updated the notebook with additional information regarding this.

To benchmark the performance of this case properly we would need to create a special dataset which has unaligned and dissimilar list of elements. Or something with large elements also.

selmanozleyen commented 1 year ago

You mean for Dask right? I am not really sure if the numbers show the overhead of creating a worker or something so I wasn't surprised with high numbers. What you are saying is conceptually correct, but I don't think the chunk sizes align as I didn't consider that when creating the datasets.

When the chunks don't align there is rechunking which loads the whole array into the memory. I will just make a small update to see if this is the case. If it is it would be an even better demonstration!

Update: I used this but the results didn't change. @ivirshup do you have any idea why? Should we expect no load to memory for dense arrays as well when it is aligned?

def write_chunked(func, store, k, elem, dataset_kwargs, iospec):
    """Write callback that chunks X and layers"""

    def set_chunks(d, chunks=None):
        """Helper function for setting dataset_kwargs. Makes a copy of d."""
        d = dict(d)
        if chunks is not None:
            d["chunks"] = chunks
        else:
            d.pop("chunks", None)
        return d

    if iospec.encoding_type == "array":
        if 'layers' in k or k.endswith('X'):
            dataset_kwargs = set_chunks(dataset_kwargs, (25, elem.shape[1])) # also tried (1000,1000)
        else:
            dataset_kwargs = set_chunks(dataset_kwargs, None)

    func(store, k, elem, dataset_kwargs=dataset_kwargs)

def write_data_to_zarr(X, shape_type, array_name, outdir, file_id):
    outfile = outdir / f"{file_id:02d}_{shape_type}_{array_name}.zarr"
    adata = create_adata(X)
    z = zarr.open_group(outfile, mode="w")

    write_dispatched(z, "/", adata, callback=write_chunked)
    zarr.consolidate_metadata(z.store)
    return f"wrote {X.shape[0]}x{X.shape[1]}_{array_name} -> {str(outfile)}\n"
flying-sheep commented 1 year ago

You mean for Dask right? I am not really sure if the numbers show the overhead of creating a worker or something so I wasn't surprised with high numbers.

That shouldn’t go into the gigabytes. I would think tens of megabytes overhead or less.

selmanozleyen commented 1 year ago

@flying-sheep, When changing a line from the function to this (in addition to chunked writing)

darrays = (da.from_array(a, chunks=(1000,1000) for a in arrays)

The results are way better

Dataset: dense 0
Concatenating 6 files with sizes:
['668MiB', '896MiB', '890MiB', '668MiB', '668MiB', '924MiB']
Total size: 4716MiB
Concatenation finished
Peak Memory: 362 MiB
--------------------------------------------------
Dataset: dense 1
Concatenating 6 files with sizes:
['668MiB', '902MiB', '899MiB', '668MiB', '668MiB', '907MiB']
Total size: 4714MiB
Concatenation finished
Peak Memory: 356 MiB
--------------------------------------------------

So this makes it clear that the problem is about determining the chunk sizes.