pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

Debugging memory issues in IMERG #227

Open TomAugspurger opened 2 years ago

TomAugspurger commented 2 years ago

Dropping a few notes here for the day:

@sharkinsspatial and I are seeing some slowness and high memory usage on the client when running the imerg recipe. This has lots of steps in the time dimension.

I've traced something that looks like a memory leak to azure.storage.blob. This script just puts a bunch of 24-bytes objects:

```python # file: async_mem.py import os import time import asyncio import azure.storage.blob.aio # N = 10_000 data = b"\x02\x013\x08\x08\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # 0 # Obervation: we don't see a memory leak when just creating the bc; Actually have to put data. async def put(acc, i): async with acc.get_blob_client(f"async/{i}") as bc: await bc.upload_blob(data=data, overwrite=True, metadata={"is_directory": "false"}) async def main(): conn_str = os.environ["AZURE_CONNECTION_STRING"] async with azure.storage.blob.aio.ContainerClient.from_connection_string(conn_str, "gpm-imerg-debug") as acc: tracemalloc.start() coros = (put(acc, i) for i in range(N)) t0 = time.time() await asyncio.gather(*coros) t1 = time.time() print(f"processed {N} blobs in {t1 - t0:.2f}s") if __name__ == "__main__": asyncio.run(main()) ```

mem

I don't see an easy workaround (I also don't really understand where the memory leak might be, and hopefully won't need to find out).


So making a bunch of small writes isn't great for performance (and memory for Azure, it turns out). Can we avoid the small writes in the first place?

One of the batches of small writes happens at https://github.com/pangeo-forge/pangeo-forge-recipes/blob/78a274b4cceb4bc1734cddc48fa9c58f553f7684/pangeo_forge_recipes/recipes/xarray_zarr.py#L109-L112

That writes 0 for each chunk in the concat dim (time typically). In the case of IMERG, that's ~360,000 writes. @rabernat mentioned this was to avoid an issue with decode_times. For better or worse, the test suite passes with those lines removed...

The second place we interact with a bunch of small objects is when consolidating the dimension coordinates at https://github.com/pangeo-forge/pangeo-forge-recipes/blob/78a274b4cceb4bc1734cddc48fa9c58f553f7684/pangeo_forge_recipes/recipes/xarray_zarr.py#L564-L578. That has one read per input. @sharkinsspatial can you confirm if you saw memory pressure here too? Or just slowness? We might be able to know that "statically", based just on the recipe definition, at least for time. For imerg I think it's just some (encoded form of) recipe.file_pattern.combine_dims[0].keys, which is the DatetimeIndex we provided to the FilePattern. Then we just have one "big" write at the end, and no small reads.

TomAugspurger commented 2 years ago

For imerg I think it's just some (encoded form of) recipe.file_pattern.combine_dims[0].keys, which is the DatetimeIndex we provided to the FilePattern. Then we just have one "big" write at the end, and no small reads.

This optimization only works when nitems_per_file is 1. Otherwise, I'm not sure that the .keys will have the information necessary. I'm also guessing that it's something of a coincidence that the keys in this case are the coordinates for that coordinate. So in short, I'm not sure what to do in general. I'm moving back to the idea that these many small writes / reads may be unavoidable and we should sort out why we're seeing memory usage grow.

@sharkinsspatial to work around the issues you were seeing in finalize_target, can you try setting recipe.consolidate_dimension_coordinates = False, and then before finalize_target add a function call like

import numpy as np
import zarr
from pangeo_forge_recipes.recipes.xarray_zarr import _gather_coordinate_dimensions

def consolidate_coordinate_dimensions(config):
    target_mapper = config.target.get_mapper()
    group = zarr.open(target_mapper, mode="a")
    # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/214
    # intersect the dims from the array metadata with the Zarr group
    # to handle coordinateless dimensions.
    dims = set(_gather_coordinate_dimensions(group)) & set(group)
    combine_dims = {x.name: x for x in config.file_pattern.combine_dims}
    for dim in dims:
        arr = group[dim]
        chunks = arr.shape
        dtype = arr.dtype
        compressor = arr.compressor
        fill_value = arr.fill_value
        order = arr.order
        filters = arr.filters

        # Performance optimization: get the values from combine_dims if possible.
        # This avoids many small reads from the non-consolidated coordinate dimension.
        # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/227
        if dim in combine_dims and combine_dims[dim].nitems_per_file == 1:
            data = np.asarray(combine_dims[dim].keys, dtype=dtype)
        else:
            data = arr[:]  # this triggers reads from the target

        attrs = dict(arr.attrs)
        new = group.array(
            dim,
            data,
            chunks=chunks,
            dtype=dtype,
            compressor=compressor,
            fill_value=fill_value,
            order=order,
            filters=filters,
            overwrite=True,
        )
        new.attrs.update(attrs)

That should run pretty quickly. (make sure to still run finalize_target so that we get the consolidated metadata.

TomAugspurger commented 2 years ago

Hmmm I've had a thought: why does the memory usage level off before we're done with all the reads / writes? Running a smaller version of the earlier profile, we see that memory use peaks at 20s, while the job finishes at 30s.

image

What if we don't really have a memory leak? What if we're trying to run all 360,000 write coroutines concurrently? Then we might see memory usage grow as each await moves on to the next coroutine, which allocates a bit more data, and so on. We eventually start completing coroutines, which frees up memory, which levels off the memory usage (since we might still be starting new coroutines, so it might not drop yet. Plus Python doesn't always release memory back to the OS when objects are deallocated).

It's just a guess, but if we limit the maximum concurrency, we might see a different chart. For this run, we have an asyncio.Semaphore to limit us to 100 concurrent writes.

```python import os import time import asyncio import azure.storage.blob.aio N = 10_000 data = b"\x02\x013\x08\x08\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # 0 # Obervation: we don't see a memory leak when just creating the bc; Actually have to put data. async def put(acc, i, sem): async with sem: async with acc.get_blob_client(f"async/{i}") as bc: await bc.upload_blob(data=data, overwrite=True, metadata={"is_directory": "false"}) async def main(): conn_str = os.environ["AZURE_CONNECTION_STRING"] async with azure.storage.blob.aio.ContainerClient.from_connection_string(conn_str, "gpm-imerg-debug") as acc: sem = asyncio.Semaphore(100) coros = (put(acc, i, sem) for i in range(N)) t0 = time.time() await asyncio.gather(*coros) t1 = time.time() print(f"processed {N} blobs in {t1 - t0:.2f}s") if __name__ == "__main__": asyncio.run(main()) ```

image

Interestingly, we aren't any slower with the limited-concurrency version. I think that's because we were CPU bound anyway (with event loop / Python overhead).

So next steps might be to look into fsspec backends to see if there's a way to limit concurrency, and design a way to set that from the recipe. I'll look into adlfs.

TomAugspurger commented 2 years ago

cc @martindurant, in case you see an obvious place where concurrency could be limited (context at https://github.com/pangeo-forge/pangeo-forge-recipes/issues/227#issuecomment-964259201, but the tl/dr is it'd be nice to have a way to limit the concurrency of AsyncFileSystem._pipe / _pipe_file when passing a ton of values)

A possible fix is in _pipe_file. For adlfs that's at https://github.com/dask/adlfs/blob/bc6e7498af806fe851b4c0157612033b46c03176/adlfs/spec.py#L1411-L1421. We could perhaps pass in an asyncio.Semaphore there and have it acquire a spot before getting the blob client or doing the write.

But that feels very ad-hoc and too narrow. Why just limit concurrency in pipe_file? So I might briefly look into doing this across adlfs for any operation that acquires a BlobClient.

I don't immediately see a way to do this maximum concurrency thing generically in fsspec, unless it were to define some protocol. So I think for now it'll have to be implemented independently in each of adlfs / s3fs / gcsfs.

rabernat commented 2 years ago

For better or worse, the test suite passes with those lines removed...

Drat. I know there are some examples that break without it, but I guess not the test suite. I think something from the recipe tutorials. This is a good motivation to perhaps run the tutorials as part of the test suite.

cisaacstern commented 2 years ago

For better or worse, the test suite passes with those lines removed...

xref https://github.com/pangeo-forge/pangeo-forge-recipes/issues/173

rabernat commented 2 years ago

I agree that we should pursue an upstream fix. But there are also a few other possibilities

Make time coordinate contiguous from the beginning

What if we explicitly specify that the time coordinate should be contiguous (single chunk) from the beginning. Then we are only doing a single write in prepare_target, and there is nothing to consolidate at the end.

Pangeo Forge should be able to handle this situation using locks. There will be some write contention between processes, which could be managed via our locking mechanism.

This is not possible with the current code but could be implemented fairly easily.

Fix handling of encoding to avoid the big write in prepare_target

This part

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/78a274b4cceb4bc1734cddc48fa9c58f553f7684/pangeo_forge_recipes/recipes/xarray_zarr.py#L109-L112

Was added to avoid cf time decording errors related to decoding an uninitialized time dimension that can crop up when opening the target with xarray. However, we we have not been opening the target with xarray in store_chunk since b603c988c57bf9246282c943c5ae09ffa7a11a07. Therefore, perhaps those lines can indeed be removed. I'm going to look into this.

TomAugspurger commented 2 years ago

Pangeo Forge should be able to handle this situation using locks. There will be some write contention between processes, which could be managed via our locking mechanism.

I've been assuming that the locking would essentially kill parallelism while storing data chunks. Is that not the case? Ah, (maybe) it's OK, because writing a variable like tmax doesn't require getting the lock to update the time coordinate?

rabernat commented 2 years ago

If the locks are fine-grained enough (e.g. only on the time variable) then it should not be too bad. Async within pangeo-forge would help a lot here too--we could write other variables while waiting on the lock.

TomAugspurger commented 2 years ago

Great. I'll look into doing the coordinate consolidation earlier (in expand_target_dim).

rabernat commented 2 years ago

I'll look into doing the coordinate consolidation earlier (in expand_target_dim).

You'll need to revisit this line:

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/78a274b4cceb4bc1734cddc48fa9c58f553f7684/pangeo_forge_recipes/recipes/xarray_zarr.py#L499

We currently are assuming that all variables with time (more generally called concat_dim) have the same target chunks (concat_dim_chunks). The target chunk grid is set up here

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/78a274b4cceb4bc1734cddc48fa9c58f553f7684/pangeo_forge_recipes/recipes/xarray_zarr.py#L231-L234

For this to work, you may need to add an argument to region_and_conflicts_for_chunk to specify the target chunks (which can now differ by variable).

martindurant commented 2 years ago

async fsspec methods get and put methods allow for a batch_size arg which controls the number of coroutines submitted in one go. The code is simple, and this could be added to all batch methods. The reason it's only in those two, is because the local open files limit was more constraining than what you are facing here. The option should be made visible and thoroughly documented.

rabernat commented 2 years ago

Perhaps there could also be a global config option to set batch_size?

martindurant commented 2 years ago

Note that you set batch_size for the filesystem instance too, or use conf "gather_batch_size" to set for all async implementations. The default is 128 or the open file limit / 8, if known. But, again, this is only for put/get, and you are probably hitting pipe instead, which is unlimited.

sharkinsspatial commented 2 years ago

@martindurant Is the expanded use of _run_coros_in_chunks across all gather operations which you mention in ref https://github.com/fsspec/s3fs/issues/537#issuecomment-944335863 still under consideration?

martindurant commented 2 years ago

Exactly that