pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

workers exceeding max_mem setting #100

Open rsignell-usgs opened 2 years ago

rsignell-usgs commented 2 years ago

A colleague and I were struggling to rechunk a zarr dataset using this workflow: https://nbviewer.jupyter.org/gist/rsignell-usgs/89b61c3dc53d5107e70cf5574fc3c833

After much trial and error, we discovered that we needed to increase the worker size to 8GB and decrease max_mem to 3GB to avoid workers running out of memory and the cluster dying with "killed_worker".

Watching the dask dashboard shows a number of the workers spiking over 5GB, despite setting max_mem to 3GB: 2021-10-04_16-16-26

When we looked at the worker logs we saw tons of these warnings:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.66 GiB -- Worker memory limit: 8.00 GiB

Is this expected behavior?

rabernat commented 2 years ago

Thanks for reporting Rich!

This ability to track unmanaged memory is new for Dask. So overall this is progress. At least we know that rechunker is not explicitly using this memory, as it is designed not to!

I would try with the trick discussed here: https://github.com/dask/distributed/issues/2602#issuecomment-891198370

Set {"MALLOC_TRIM_THRESHOLD_": "0"} in the environment variables on your dask workers. See if that improves things.

rsignell-usgs commented 2 years ago

Thanks @rabernat for the idea.

I'm not sure I enabled it correctly, however.

I added this line to my notebook just after I imported dask, before I created the cluster:

dask.config.set({"MALLOC_TRIM_THRESHOLD_": "0"})

Didn't seem to have any impact -- I'm getting the same behavior as before, with memory going way above 3GB and lots of the same unmanaged memory warnings in the logs.

Do I need a dask worker plugin or something?

rabernat commented 2 years ago

Yeah that's not right. You need to set an environment variable on the workers. The way you do this depends on how you are creating your dask cluster.

How are you creating your dask cluster? Dask gateway?

rsignell-usgs commented 2 years ago

@rabernat, yes, Dask Gateway. But the Dask Gateway on Qhub for some reason is not configured to take a dict of environment variables on cluster creation (right @dharhas?)

So this time I created the cluster and then did client.run() on the workers to set that environment variable on the Dask workers:

def set_env(k,v):
    import os
    os.environ[k]=v

client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

but in the worker logs I still see lots of:

distributed.worker - INFO - Run out-of-band function 'set_env'

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 5.68 GiB -- Worker memory limit: 8.00 GiB

BTW, the workflow should be reproducible, as we are reading from an S3 requester-pays bucket. One would need to supply one's own S3 bucket for writing, of course.

rabernat commented 2 years ago
client.run(set_env,'MALLOC_TRIM_THRESHOLD_','0')

Still not the right way to do it. You need to use cluster options (as in the notebook linked from the issue linked above).

from dask_gateway import Gateway
g = Gateway()
options = g.cluster_options()
options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
cluster = g.new_cluster(options)
dharhas commented 2 years ago

@rsignell-usgs you can set environment variables on dask_gateway in QHub via environment_vars kwarg but looks like you need to upgrade to qhub 0.3.12

martindurant commented 2 years ago

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

rabernat commented 2 years ago

Would it be fair to say that max_mem, which defines the upper limit of a chunk memory footprint, is expected to be much smaller than the actual peak memory usage? There will always be temporary arrays allocated and bytes objects during remote writes.

Yes, I think that's correct. But here I think we have a bigger problem related to garbage collection.

rsignell-usgs commented 2 years ago

Quansight is releasing a new version of Qhub later this week, at which point I will upgrade the ESIP qhub, and we will have a much easier way to set environment variables on the workers, which will facilitate trying out {"MALLOC_TRIM_THRESHOLD_": "0"} idea.

fmaussion commented 2 years ago

I'm struggling a lot with distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak at the moment.

Before I report the issue in more detail, I wanted to try this MALLOC_TRIM_THRESHOLD_ method - does someone know how I can set in on a LocalCluster instance? The dask doc is not particularly helpful and I don't need dask_gateway as in your examples @rabernat (which, btw, also seem to fail on recent dask_gateway as far as I could try with a local gateway server)

Thanks a lot!

rabernat commented 2 years ago

The details of how I was setting MALLOC_TRIM_THRESHOLD_: https://github.com/dask/distributed/issues/2602#issuecomment-891156357

It really does seem to be a magic flag for Dask. I don't know how to set it on a LocalCluster.

fmaussion commented 2 years ago

Thanks Ryan - I've asked on Dask Discourse and will report back.

dcherian commented 2 years ago

I think you sete it on the "nanny": https://docs.dask.org/en/stable/configuration.html?highlight=malloc#distributed-nanny

EDIT: maybe not: https://github.com/dask/distributed/issues/5971

fmaussion commented 2 years ago

Thanks for your help. I'll add my voice to the issues raised there. I wanted to showcase dask for my advanced programming class but this really won't help. Even the simplest of computations (that would fit in memory) are killing my local cluster: https://nbviewer.ipython.org/gist/fmaussion/5212e3155256e84e53d033e61085ca30

dcherian commented 2 years ago

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

I guess we should say this here.

fmaussion commented 2 years ago

Try chunking inside the open_*dataset calls by passing chunks. Otherwise, it will read the whole file into memory and then chunk and distribute it.

Well this is embarrassing - I did not know that. This saved my class if not my dignity ;-)

I guess we should say this here.

I will open a PR immediately. This should be priority number one for mf datasets. I guess this problem does not occur with single file open_datasets?

dcherian commented 2 years ago

I guess this problem does not occur with single file open_datasets?

I think it still does. "chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

fmaussion commented 2 years ago

I think it still does

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

"chunk as early as possible, and avoid rechunking as much as possible" is the principle in my experience.

This just made it to the docs ;-)

dcherian commented 2 years ago

Why? Per default the variables are lazy loaded and not dask arrays, and a subsequent call to .chunk() will convert them?

makes sense!

NoeLahaye commented 1 year ago

I am following on this issue (although I am not sure whether this is the correct place -- maybe this is more dask-related). I am still struggling with this error: unmanaged memory getting very high, raising warnings and errors eventually causing the workers to be killed. I am usually using a LocalCluster or PBSCluster. Is setting MALLOC_TRIM_TRESHOLD_ to 0 still the way to go? How to do so? Beyond the issue mentioned by Ryan above, I have been going through various Dask issues (this one, seemingly fixed by that one) and parts of the doc, but my brain does not manage to reach a conclusion and get me through this.