dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Memory leak in gather() #5430

Open espg opened 2 years ago

espg commented 2 years ago

EDIT: We are able to avoid the StreamClosedError by upscaling the scheduler to 500GB memory, and chunking the dataset to four smaller runs that are aggregated seperately. However, memory usage on gather(futures, direct=True) still causes a memory spike several times larger than any individual worker has. The error message when this occurs is:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Not chunking the dataset for the download runs fine on the worker nodes... just can't gather that much data in one pass without errors, which will often trigger the futures to recompute faster than the data is able to transfer =/

What happened:

Relatively (conceptually) simple big data task. Data is temporally indexed as orbital files-- they look something like this over Antarctica (each overlapping line is 1/12 of an orbit segment, there are ~1300):

image

Want them to be spatially sharded, here's what ~60 of the shards might look like:

image

Task is three parts:

  1. Read in ~130GB data from ~1300 files in parallel
  2. Aggregate data
  3. Spatially re-index, write to parquet (hive) in S3

Task 1 works fine and is what most of the example code does.

Task 2 is failing. Task 3 isn't included in the minimal example.

What you expected to happen:

It would be nice if gather() worked consistently. Sometimes it works; if I use a ~60GB dataset, it can complete. If I split the futures list into 5 or 10 smaller lists and call gather() on those subsets, it has more of a chance of working. If I increase the worker size, it also has a better chance of working.

The example given is designed to fail-- consistently. The provided example is failing a 100% of the time for me. This is a scaling issue, so the example has been scaled to a size that will bring out the problem.

I can make things work by not using gather and doing something like:

agglist = []
for future in reslist0:
    agglist.append(future.result())

...but it's awful. Running gather(futures, direct=True) on a smaller 60GB dataset will complete in about 2 minutes. Doing in a loop like above takes over 20 minutes. The orbital pattern is on a 90 day repeat pattern, so I have like dozens of these things to process and would like to do it the faster way. Using gather() without the direct flag overwhelms the scheduler node. Originally this was returning vaex dataframes, which were unmanaged in the worker memory, but changing it to pandas dataframes gives the exact same error message.

Minimal Complete Verifiable Example:


import h5py
import numpy as np
import pandas as pd
from astropy.time import Time
import os
import vaex
import time
import pickle
import dask

from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()
options.worker_specification = '8CPU, 32GB'
cluster = gateway.new_cluster(options)
cluster.scale(16)
client = cluster.get_client(set_as_default=True)

# worker functions

def gps2dyr(time):
    """Converts GPS time to datetime (can also do decimal years)."""
    return Time(time, format='gps').datetime

def read_atl06(fname, cycle):
    """Read one ATL06 file and output 6 reduced files. 

    Extract variables of interest and separate the ATL06 file 
    into each beam (ground track) and ascending/descending orbits.
    """

    # Each beam is a group
    group = ['/gt1l', '/gt1r', '/gt2l', '/gt2r', '/gt3l', '/gt3r']

    # Loop trough beams
    dataframes = []

    with h5py.File(fname, 'r') as fi:
        # Check which ground tracks are present in this file
        gtracks = sorted(['/'+k for k in fi.keys() if k.startswith('gt')])

        for k, g in enumerate(gtracks): 
            # Read in data for a single beam
            data = {}
            # this is our unique key (per beam)
            data['id'] = fi[g+'/land_ice_segments/segment_id'][:]
            npts = len(data['id'])
            # Load vars into memory (include as many as you want)
            data['lat'] = fi[g+'/land_ice_segments/latitude'][:]
            data['lon'] = fi[g+'/land_ice_segments/longitude'][:]

            data['slope_y'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dy'][:]
            data['slope_x'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dx'][:]
            data['slope_x_sigma'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dx_sigma'][:]

            data['h_li'] = fi[g+'/land_ice_segments/h_li'][:]
            data['s_li'] = fi[g+'/land_ice_segments/h_li_sigma'][:]
            data['q_flag'] = fi[g+'/land_ice_segments/atl06_quality_summary'][:]
            data['s_fg'] = fi[g+'/land_ice_segments/fit_statistics/signal_selection_source'][:]
            data['snr'] = fi[g+'/land_ice_segments/fit_statistics/snr_significance'][:]
            data['h_rb'] = fi[g+'/land_ice_segments/fit_statistics/h_robust_sprd'][:]
            data['bsnow_conf'] = fi[g+'/land_ice_segments/geophysical/bsnow_conf'][:]

            data['cloud_flg_asr'] = fi[g+'/land_ice_segments/geophysical/cloud_flg_asr'][:]
            data['cloud_flg_atm'] = fi[g+'/land_ice_segments/geophysical/cloud_flg_atm'][:]
            data['msw_flag'] = fi[g+'/land_ice_segments/geophysical/msw_flag'][:]
            data['fbsnow_h'] = fi[g+'/land_ice_segments/geophysical/bsnow_h'][:]
            data['bsnow_od'] = fi[g+'/land_ice_segments/geophysical/bsnow_od'][:]
            data['layer_flag'] = fi[g+'/land_ice_segments/geophysical/layer_flag'][:]
            data['bckgrd'] = fi[g+'/land_ice_segments/geophysical/bckgrd'][:]
            data['e_bckgrd'] = fi[g+'/land_ice_segments/geophysical/e_bckgrd'][:]
            data['n_fit_photons'] = fi[g+'/land_ice_segments/fit_statistics/n_fit_photons'][:]
            data['w_surface_window_final'] = fi[g+'/land_ice_segments/fit_statistics/w_surface_window_final'][:]

            delta_t = fi[g+'/land_ice_segments/delta_time'][:]     # for time conversion
            t_ref = fi['/ancillary_data/atlas_sdp_gps_epoch'][:]     # single value

            # Time in GPS seconds (secs since Jan 5, 1980)
            t_gps = t_ref + delta_t

            # GPS sec to datetime
            data['t_year'] = gps2dyr(t_gps)
            data['cycle'] = np.ones(npts, dtype=np.int8)*cycle
            data['track'] = np.repeat(g[1:], npts)

            # Make a dataframe out of our data dict and store it.

            dataframes.append(vaex.from_dict(data))
        if len(dataframes) > 0:
            result = vaex.concat(dataframes).to_pandas_df()
            return result

def get_thing(thing, user='', passw='', delayMax=199):
    delay = np.random.randint(0, delayMax)
    time.sleep(delay*0.01)
    preamble = "wget --http-user=" + user + " --http-password=" + passw 
    middle = ' --load-cookies mycookies.txt --no-check-certificate --auth-no-challenge -r --reject "index.html*" -np -e robots=off --show-progress=off --cut-dirs=6 ' 
    cmmd = preamble + middle + thing + " -P /tmp/"
    os.system(cmmd)
    lfilepath = '/tmp/n5eil01u.ecs.nsidc.org' + thing[-40:]
    res = read_atl06(lfilepath, cycle=int(3))
    return res

# run tasks

with open("test.txt", "rb") as fp:
    iceFiles11 = pickle.load(fp)

reslist0 = []
for ice0 in iceFiles11[::2]:
    reslist0.append(client.submit(get_thing, ice0, retries=1000))

# Fail
stuff1 = list(filter(None, client.gather(reslist0, errors='skip', direct=True)))

Here's the traceback:


---------------------------------------------------------------------------
StreamClosedError                         Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    197         try:
--> 198             frames_nbytes = await stream.read_bytes(fmt_size)
    199             (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)

StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

CommClosedError                           Traceback (most recent call last)
<timed exec> in <module>

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1964             else:
   1965                 local_worker = None
-> 1966             return self.sync(
   1967                 self._gather,
   1968                 futures,

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    858             return future
    859         else:
--> 860             return sync(
    861                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    862             )

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1858                 else:
   1859                     self._gather_future = future
-> 1860                 response = await future
   1861 
   1862             if response["status"] == "error":

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1904                 if missing_keys:
   1905                     keys2 = [key for key in keys if key not in data2]
-> 1906                     response = await retry_operation(self.scheduler.gather, keys=keys2)
   1907                     if response["status"] == "OK":
   1908                         response["data"].update(data2)

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
    384     )
--> 385     return await retry(
    386         partial(coro, *args, **kwargs),
    387         count=retry_count,

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    872             name, comm.name = comm.name, "ConnectionPool." + key
    873             try:
--> 874                 result = await send_recv(comm=comm, op=key, **kwargs)
    875             finally:
    876                 self.pool.reuse(self.addr, comm)

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    649         await comm.write(msg, serializers=serializers, on_error="raise")
    650         if reply:
--> 651             response = await comm.read(deserializers=deserializers)
    652         else:
    653             response = None

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    212             self._closed = True
    213             if not sys.is_finalizing():
--> 214                 convert_stream_closed_error(self, e)
    215         except Exception:
    216             # Some OSError or a another "low-level" exception. We do not really know what

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py in convert_stream_closed_error(obj, exc)
    126         raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
    127     else:
--> 128         raise CommClosedError(f"in {obj}: {exc}") from exc
    129 
    130 

CommClosedError: in <TLS (closed) ConnectionPool.gather local=tls://192.168.57.107:36856 remote=gateway://traefik-prod-dask-gateway.prod:80/prod.d7ea193ae2ae463e8be510d8859e1629>: Stream is closed

Anything else we need to know?:

The instances that this was run on either had 256GB or 972GB memory, so the error isn't related to where the data is being dumped.

Here's the repo with the docker file to build the workers:

https://github.com/pangeo-data/jupyter-earth/tree/master/hub.jupytearth.org-image

Here's the repo with the test.txt file that's needed to run the example and also a notebook showing the error: https://github.com/espg/DaskExampleError

@crusaderky and @ian-r-rose might find this of interest from previous conversations. Related to https://github.com/pangeo-data/jupyter-earth/issues/89

Environment:

espg commented 2 years ago

I've been trying to debug this a bit more-- here's some things that make a difference that probably shouldn't:

consideRatio commented 2 years ago

This is an gif animation of a dask scheduler being sent a lot of data after a server received a lot of data from workers by running a ...gather(direct=True) command.

dask-gateway-scheduler-resource-use-anim

espg commented 2 years ago

To follow up more on this issue-- effectively, the scheduler seems to be spiking memory usage at some point during gather(). We increased the scheduler memory to 500GB to debug things, and are able to finish some of the gather() calls, but the behavior still seems like a bug to me.

Our worker nodes are 32GB memory, and we're calling gather(futures, direct=True), so it doesn't seem like normal behavior to have, say, 69 GB of data go thru the scheduler, even that doesn't crash the scheduler due to us forcing it to be on a high memory node. Even with oversizing the scheduler to have a half terabyte of ram, we still need to break the processing loop into batches to be able to successfully gather back to the calling notebook.

Here's some of the logs observed to occur (@consideRatio , can you verify if these are from with or without the direct flag? I have conflicting statements in my notes...):

distributed.scheduler - INFO - Starting worker compute stream, tls://192.168.56.240:46135
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 30.83s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 24.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.29s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.14s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 3.10s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.scheduler - ERROR - Couldn't gather keys {'get_thing-b55e8bb43dc228bb2896d08884835ba5': ['tls://192.168.55.248:33771'], 'get_thing-ea207aee1230f52ba97c988b776d4dab': ['tls://192.168.48.133:43401'], 'get_thing-7936974fff32f9f8973bb42b29b0eb52': ['tls://192.168.37.252:41315'], ....

The gather keys error spews lots of repetition until:

'get_thing-b4767002d0f1a26ab47042b890fb34d5': ['tls://192.168.63.228:46817'], 'get_thing-5c5087c6dd3d660b9ed1df30958573e3': ['tls://192.168.63.228:46817'], 'get_thing-2b2cb1b38b41db8cb1fadceb37e931f4': ['tls://192.168.38.160:33789']} state: ['memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory', 'memory'] workers: ['tls://192.168.35.156:45809', 'tls://192.168.34.28:37075', 'tls://192.168.46.200:42501', 'tls://192.168.63.228:46817', 'tls://192.168.55.248:33771', 'tls://192.168.38.160:33789', 'tls://192.168.48.133:43401', 'tls://192.168.37.252:41315']
NoneType: None
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.35.156:45809', name: dask-worker-12584cddd0df443399738bf3846ecc24-wvzd9, memory: 41, processing: 0>
distributed.core - INFO - Removing comms to tls://192.168.35.156:45809
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.34.28:37075', name: dask-worker-12584cddd0df443399738bf3846ecc24-5fgrz, memory: 39, processing: 2>
distributed.core - INFO - Removing comms to tls://192.168.34.28:37075
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.46.200:42501', name: dask-worker-12584cddd0df443399738bf3846ecc24-2pxhp, memory: 44, processing: 3>
distributed.core - INFO - Removing comms to tls://192.168.46.200:42501
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.63.228:46817', name: dask-worker-12584cddd0df443399738bf3846ecc24-hdjb8, memory: 49, processing: 4>
distributed.core - INFO - Removing comms to tls://192.168.63.228:46817
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.55.248:33771', name: dask-worker-12584cddd0df443399738bf3846ecc24-htqd4, memory: 40, processing: 6>
distributed.core - INFO - Removing comms to tls://192.168.55.248:33771
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.38.160:33789', name: dask-worker-12584cddd0df443399738bf3846ecc24-mgrc2, memory: 49, processing: 7>
distributed.core - INFO - Removing comms to tls://192.168.38.160:33789
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.48.133:43401', name: dask-worker-12584cddd0df443399738bf3846ecc24-bcz7m, memory: 42, processing: 10>
distributed.core - INFO - Removing comms to tls://192.168.48.133:43401
distributed.scheduler - INFO - Remove worker <WorkerState 'tls://192.168.37.252:41315', name: dask-worker-12584cddd0df443399738bf3846ecc24-sdhvt, memory: 40, processing: 12>
distributed.core - INFO - Removing comms to tls://192.168.37.252:41315
distributed.scheduler - ERROR - Workers don't have promised key: ['tls://192.168.55.248:33771'], get_thing-b55e8bb43dc228bb2896d08884835ba5
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tls://192.168.48.133:43401'], get_thing-ea207aee1230f52ba97c988b776d4dab
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tls://192.168.37.252:41315'], get_thing-7936974fff32f9f8973bb42b29b0eb52
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tls://192.168.55.248:33771'], get_thing-2dcb50d2a94b64836749586d9911b418
...

...with lots more repetition until...

distributed.scheduler - ERROR - Workers don't have promised key: ['tls://192.168.38.160:33789'], get_thing-2b2cb1b38b41db8cb1fadceb37e931f4
NoneType: None
distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing.  Expected: <WorkerState 'tls://192.168.56.101:34281', name: dask-worker-12584cddd0df443399738bf3846ecc24-n9zdd, memory: 30, processing: 15>, Got: <WorkerState 'tls://192.168.63.228:46817', name: dask-worker-12584cddd0df443399738bf3846ecc24-hdjb8, memory: 0, processing: 0>, Key: get_thing-4ad0aeebed26b9fcae80d6f9d64e1fc9
distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing.  Expected: <WorkerState 'tls://192.168.40.255:41265', name: dask-worker-12584cddd0df443399738bf3846ecc24-m74kx, memory: 46, processing: 14>, Got: <WorkerState 'tls://192.168.63.228:46817', name: dask-worker-12584cddd0df443399738bf3846ecc24-hdjb8, memory: 1, processing: 0>, Key: get_thing-13a4eda67c4bc2d4f8e57569ae1badd5
distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing.  Expected: <WorkerState 'tls://192.168.41.146:45139', name: dask-worker-12584cddd0df443399738bf3846ecc24-mt7mc, memory: 39, processing: 15>, Got: <WorkerState 'tls://192.168.63.228:46817', name: dask-worker-12584cddd0df443399738bf3846ecc24-hdjb8, memory: 2, processing: 0>, Key: get_thing-931d0d3779a1e43d084daf261b625e83
...

...which repeats ad nauseam.

To the coiled devs: is there anything that we can do to alleviate this in the cluster configuration? Also, is there any information that we can provide that would help better pinpoint where the scaling is breaking down?

consideRatio commented 2 years ago

I think those were without passing direct=True @espg, we had the same unresponsive stuff reported GIL with direct=True though.

So, using direct=True worked for us when the scheduler didnt run out of memory, but made the scheduler unresponsive (dashboard data viewed in gif were shown retrospectively, as the data were dumped after a long freeze). But, not using direct=True caused failures after the unresponsive state rather than recovering properly, as seen in logs above.

bocklund commented 2 years ago

@espg were you able to track down when the memory leak started? I also am seeing a leak in my workers. It's pretty difficult for me to get a minimal reproducer in the code I'm working with, but it's a very simple application of essentially client.gather(client.map(f, *xs)).

When I run serially without dask, memory is very stable. With dask, I was seeing the fast growing memory usage (~10 MB/s with each call of f ~0.2s) on distributed 2021.10.0 and 2021.8.1, but downgrading to 2021.5.1 seemed to resolve it. I'm not using direct=True, so I can't comment on that, but that seems to be unrelated to the core of this issue.

I'm not sure if this helps other than to help anyone who works on this potentially bisect the issue.

fjetter commented 2 years ago

I'm having a bit of a hard time to figure the minimal example out but from what I understand you're trying to load a lot of data onto the client. From your very first outline you said you'd rather want to store this in a parquet dataset in S3. why are you trying to gather all the data then?

What happens with direct=False is that all of the data is collected on the scheduler and that data is then submitted to the client. That requires at least as much memory as is collectively stored on the cluster. Realistically, it is possible that this process creates a copy of your data. At the very least it is possible to create partial memory copies which might spike the usage enough to kill your scheudler.

Using direct=True will try to avoid the scheduler and will fetch data directly from the workers. This is only working on a best effort basis, e.g. if the data is not where it was expected or a connection failures happens, the scheduler is still used as a middle man.

My recommendation would generally be, try to avoid gather if it is a lot of data. For instance, if you store your payload in parquet, you'll only gather metadata (e.g. file keys) but not the actual payload. Maybe you can tell us a bit more about what you are intending to do by gathering the data?

on distributed 2021.10.0 and 2021.8.1, but downgrading to 2021.5.1 seemed to resolve it. I'm not using direct=True, so I can't comment on that, but that seems to be unrelated to the core of this issue.

I think there was indeed a problem with LocalClusters causing too much memory usage but that should've been resolved by now. If not, could you try to create a reproducer?

espg commented 2 years ago

@fjetter here's a minimum example that you should be able to reproduce-- s3 buckets are AWS region us-west-2, so if you replicate there, there won't be any egress fees. Calling notebook is m5.16xlarge , here's the python code:

import vaex
from dask_gateway import Gateway
import numpy as np

# Define Cluster
gateway = Gateway()
options = gateway.cluster_options()
options.worker_specification = '4CPU, 16GB'
cluster = gateway.new_cluster(options)
cluster.scale(64)
client = cluster.get_client(set_as_default=True)

# Define fetch list from s3
prefix = 's3://geostacks/icesat_2/shards=311'
mid = '/cycle='
suffix = '/atl06.parquet'

filelist = []

for i in ['1','2','3','4']:
    for j in ['1','2','3','4']:
        for k in ['1','2','3','4']:
            for h in ['1','2','3','4']:
                # for g in ['7', '8', '9']: ## More severe 37GB memory spike with 50% dataset size increase
                for g in ['7', '8']: ## Minimum example with 3.5GB memory spike
                    s3path = prefix + i + j + k + h + mid + g + suffix
                    filelist.append(s3path)

# Return arrow table for managed memory
def get_thing(s3file):
    return vaex.open(s3file).to_arrow_table()

# Get data on workers-- runs fast!
futures = []
# for thing in filelist[3:]:  # For 37GB spike
for thing in filelist[2:]:  # For minimal example (3.5GB spike)
    futures.append(client.submit(get_thing, thing, retries=10))

# Return data and cause gather behavior
stuff = client.gather(futures, direct=True)

Both the minimal example with the 3.5GB spike and the poor scaling behavior with the 37GB spike when increasing the dataset volume by 50% should run fine on m5.16xlarge instance. If workers are already spun up, the example should take less then 5 minutes to run, with most of that time being the gather step. Here's what the full spike looks like on the minimal example:

image

...and the poor scaling (10x memory usage for 1.5x data) with additional data by switching the commented out blocks in the above code:

image

espg commented 2 years ago

My recommendation would generally be, try to avoid gather if it is a lot of data. For instance, if you store your payload in parquet, you'll only gather metadata (e.g. file keys) but not the actual payload. Maybe you can tell us a bit more about what you are intending to do by gathering the data?

We are reshuffling / reindexing the data. The dataset in the original example (not the minimal example I just posted, the code block that opened this issue) is arranged by time, specifically by linear segments of an orbit. We are reindexing the data by space; we've done this for some of the dataset already, and the parquet naming scheme that the for loop I just posted for the minimal example crawls down a spatial tree structure:

for i in ['1','2','3','4']:
    for j in ['1','2','3','4']:
        for k in ['1','2','3','4']:
            for h in ['1','2','3','4']:

Each i is subdivided to 4 'j', 16 k, and 64 h; so the file list 512 base resolution parquet shards, defined per 90 day period (there are actually more levels, but we're starting 3 levels into the tree structure for the reproducible example).

These parquet shards are easy to use once in place, but it's been an issue to generate them. Apache parquet files on s3, at least as far as I can tell, are immutable. We can't write to the shard partially, and then append to it later-- we have to write all at once. That's why for g in ['7', '8']: is there in the last level of the for loop; 7 and 8 refer to orbital cycles 7 and 8 of the satellite mission... so we gather data for 90 days, write out that data, and then when the instrument repeats the same ground path we start a new parquet file (since we can't append).

We are doing this data shuffling on a single machine because that's what dask recommends. We are effectively doing a group-by over 1.5 to 2.5 billion rows (~300GB depending on cycle) to 16,384 theoretical keys (less in practice, because not every node in the tree has data...but still to over 10,000). My impression is that if we read into one dask backed distributed 'pandas' dataframe and run this group by operator, it will be painful... expectation is that the time indexed dataset is worst case distributed across the workers.

espg commented 2 years ago

@espg were you able to track down when the memory leak started? I also am seeing a leak in my workers. It's pretty difficult for me to get a minimal reproducer in the code I'm working with, but it's a very simple application of essentially client.gather(client.map(f, *xs)).

When I run serially without dask, memory is very stable. With dask, I was seeing the fast growing memory usage (~10 MB/s with each call of f ~0.2s) on distributed 2021.10.0 and 2021.8.1, but downgrading to 2021.5.1 seemed to resolve it. I'm not using direct=True, so I can't comment on that, but that seems to be unrelated to the core of this issue.

I'm not sure if this helps other than to help anyone who works on this potentially bisect the issue.

@bocklund we've not been using dask for very long, so from our perspective the behavior has always occurred... but it is good to know that this may be a regression and hopefully a quick fix if it used to behave better!

Scaling on this is non-linear with respect to the amount of data being returned, so here's the code that we've been using to work around the behavior, in case it's useful to anyone else:

task_futures # some list of futures from client.submit(function, inputs) in for loop

# split futures list into chunks that don't crash dask
npart = 10 # number of splits
lst = np.array_split(task_futures, npart)
big_list = []
for thing in tqdm(lst):
    futures = thing.tolist()
    big_list.append(swarm.gather(futures, errors='skip', direct=True)))
    # optional loop to free memory on workers
    for future in futures:
        future.cancel()

Running gather at the end is faster than partitioning, but you get back a small amount of performance by being able to start to gather while some of the submitted tasks are still processing...

fjetter commented 2 years ago

We are doing this data shuffling on a single machine because that's what dask recommends. We are effectively doing a group-by over 1.5 to 2.5 billion rows (~300GB depending on cycle) to 16,384 theoretical keys (less in practice, because not every node in the tree has data...but still to over 10,000). My impression is that if we read into one dask backed distributed 'pandas' dataframe and run this group by operator, it will be painful... expectation is that the time indexed dataset is worst case distributed across the workers.

Well, dask tells you to avoid distributed shuffling if possible. However, in your case you are far beyond that and you can no longer avoid this. Effectively, what you are doing is to implement your own shuffle algorithm. The section you are linking is mostly trying to raise awareness that some operations are not as cheap as they are for a pandas dataframe. Sometimes a computation can be expressed differently to avoid a shuffle, etc. In your case, you cannot if I'm correct.

I'm a bit struggling with your minimal examples (at this time it's less about reproducing it but rather about understanding it). If I'm not mistaken, you are trying to do something like the following?

ddf = dd.from_parquet("s3://time-partitioned-dataset")

ddf_indexed = ddf.set_index("space_index_column")
ddf_indexed.to_parquet("s3://space-partitioned_dataset")

FWIW we're currently working on a new implementation of our shuffle (what's behind set_index) to make this scale much better, a protoype is currently available on a branch if you want to take it for a spin and leave feedback https://github.com/dask/dask/pull/8223

There is also a nice blog post explaining what's behind this and how to use it https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/

espg commented 2 years ago

@fjetter I think that's a broadly fair summary of what we're trying to do. The minimal example I gave is fetching data that has already been ingested and spatially indexed; I posted it to help debug just the memory issue, since it doesn't require an earth data login or the boiler plate code for downloading and parsing the source hdf files.

@consideRatio could we clone the repo mentioned above and set it as an alternate image to select for scaling on the dask workers? It could be worth trying the reindex on the workers using that branch... if we decide to scale to ATL03 data it will likely require a distributed solution for calculating the shards.