pangeo-data / jupyter-earth

Jupyter meets the Earth: combining research use cases in geosciences with technical developments within the Jupyter and Pangeo ecosystems.
https://jupytearth.org
Creative Commons Zero v1.0 Universal
28 stars 6 forks source link

Learning how to manage data on dask workers #89

Open consideRatio opened 2 years ago

consideRatio commented 2 years ago

@espg have run into memory management issues, and this is my attempt to summarize the problem we need to resolve for a proper solution of the issue.

This is my attempted problem description

We have troubles with memory management on our dask workers. Our dask workers are scheduled to run tasks, and sometimes multiple tasks are run in parallel on each worker. Each task completed will have some associated result that we want to store in memory, and then at a later time collect it. This collection is typically done when all of a larger set of tasks have completed.

The issue is that our worker's don't retain the tasks result reliably, the results stored in memory are considered unmanaged - and they end up being garbage collected.

When we collect results from workers, we want that memory to be released.

I think these are key questions

espg commented 2 years ago

@consideRatio some (but not all) of this maybe related to cluster structure =/ Some interesting data points:

fjetter commented 2 years ago

Can you provide an example of how you create your data? Typically the only thing you'll need to do for a result to be retained by the worker and for it to be considered managed memory is to return the data in the task you are running. The output type is pretty much irrelevant. To use the data, just pass along the key to another future and dask takes care that it is passed forward, e.g.

def a_lot_of_data(*args, **kwargs):
    return ["0" * 2**30] + ["1" * 2**30]
fut = client.submit(a_lot_of_data)

def data_on_worker(dask_worker):
    return list(dask_worker.data.keys())

client.run(data_on_worker)

def use_the_data(data):
    return sum(map(len, data))

res_f = client.submit(use_the_data, fut)
res_f.result()

del fut, res_f # this will release the data again (not shown on the GIF)

Screen Recording 2021-10-27 at 14 51 13

espg commented 2 years ago

@fjetter Thanks for checking in on this, we appreciate it :-) I think I've started to get a better handle on how dask manages memory, at least some of the time. Initially, we were downloading files to the worker, using h5py to read the data to arrays, and then dumping the data to vaex dataframes before gathering the data (i.e., some of the code in dask/distributed#5430). The vaex dataframes were showing up as unmanaged memory and had poor performance when calling gather because of object introspection, so now we serialize the vaex dataframes to arrow tables before calling gather, and arrow tables show up as managed memory like we would expect.

We could load the data into dask-pandas dataframes instead; the main reason that we've been using vaex inside of dask distributed is because the export to parquet function in vaex lets us write parquet files in the apache hive hierarchy:

# list is a list of lists, with the inner list being futures that return an arrow table
for thing in tqdm(lst):
    futures = thing.tolist()
    # some returns are NoneType; filter these out
    quiver = list(filter(None, swarm.gather(futures, errors='skip', direct=True)))
    # reclaim memory on the worker-- this works fine
    for future in futures:
        future.cancel()
    for arrow in quiver:
        big_list.append(vaex.from_arrow_table(arrow))

data = vaex.concat(big_list)
# midx is mortons index; it is variant of healpix flat spacial index
# a value might be '4112412423423141333'
data['shards'] = data03.midx // 10**12
data03['chunk'] = data03.midx // 10**16
# this is cutting the tree at two different levels
# shard is 4112412
# chunk is 411

def write_data(arrowtable):
    vaexDF = vaex.from_arrow_table(arrowtable)
    del vaexDF['chunk']
    # this function is the reason that we use vaex
    vaexDF.export_partitioned('s3://geostacks/icesat_2/{subdir}/atl06.parquet', by=['shards', 'cycle'])

# Here's how data is written    
data.select(data.chunk == 411)
# partial subset based on chunk
tmp = data.to_arrow_table(selection=True, parallel=True)
# give to worker to write out-- this would be wrapped in a loop
example = client.submit(tmp, direct=True)
task = client.submit(write_data, example)

# output on first run for cycle 6
# s3://geostacks/icesat_2/shards=4111111/cycle=6/atl06.parquet
# s3://geostacks/icesat_2/shards=4111112/cycle=6/atl06.parquet
# ...
# s3://geostacks/icesat_2/shards=4114444/cycle=6/atl06.parquet

# output on next run for cycle 7
# s3://geostacks/icesat_2/shards=4111111/cycle=7/atl06.parquet
# s3://geostacks/icesat_2/shards=4111112/cycle=7/atl06.parquet
# ...
# s3://geostacks/icesat_2/shards=4114444/cycle=7/atl06.parquet

In general I'm open to suggestion on how to do this better. We are casting back and forth from arrow tables to vaex dataframes a fair bit, but the operation is quick. If there's a way to load all the data into a dask backed pandas array on the workers, and then export to the hive arrow format across the cluster using custom defined partitions, that would be great-- I just haven't seen anything about hierarchal export to hive in the dask documentation.

One issue that we have been running into on memory management is that the scheduler will try and 'fill up' a worker, and this often crashes our workers because the memory requirements don't fit within the scheduler logic. To give a concrete example--

If we loop thru our list and scatter objects, we have a really good chance of crashing the workers. Dask seems to make a copy of any object that you scatter to it; so a 40GB object placed on a worker will spike memory usage to ~85GB. Once the scatter operation is complete, memory usage drops back down to 40GB. If we run the above in a loop, and there are two 40GB objects to scatter, the following happens:

To address this, we've been trying to assign workers manually as described in described in dask/distributed#5451 -- but that only is able to assign tasks to worker based on the number of resident objects, not the amount of space used. From the code example that you posted above, it looks like you don't define the variable dask_worker; so I assume that this is a variable that is defined and updated on the worker automatically. Does the dask_worker object have the memory usage of that worker contained in it somewhere that we could query programmatically?

fjetter commented 2 years ago

The vaex dataframes were showing up as unmanaged memory and had poor performance when calling gather because of object introspection, so now we serialize the vaex dataframes to arrow tables before calling gather, and arrow tables show up as managed memory like we would expect.

I believe the problem here might be how we measure memory. We're relying on all objects to either implement a __sizeof__ method or to implement a sizeof dispatch (see here. Neither might be the case for vaex, I'm not too familiar with the code.

Only what dask measures will be reflected as managed memory. However, for the unmanaged memory we'll actually inspect the RSS memory needs of the process your running in. measured/managed minus RSS is unmanaged (roughly speaking; see http://distributed.dask.org/en/latest/worker.html#using-the-dashboard-to-monitor-memory-usage for details).

We could load the data into dask-pandas dataframes instead; the main reason that we've been using vaex inside of dask distributed is because the export to parquet function in vaex lets us write parquet files in the apache hive hierarchy:

Are you familiar with https://docs.dask.org/en/latest/generated/dask.dataframe.to_parquet.html ? partition_on=['shards', 'cycle'] should do the same. See also https://docs.dask.org/en/latest/generated/dask.dataframe.from_delayed.html for delayed->DataFrame conversion

From the code example that you posted above, it looks like you don't define the variable dask_worker;

Client.run is a special function for debugging purposes. It injects the worker as an argument and you can debug your setup if necessary. I would consider this advanced, though. The function runs out of band on the worker event loop. I don't recommend to do this for any actual computation since that will cause instabilities. Ordinary payload computations should be run by using Client.submit and they'll be run in a Thread. If you need the worker object in there, for whatever reason, use get_worker instead