spencerahill / aospy

Python package for automated analysis and management of gridded climate data
Apache License 2.0
84 stars 13 forks source link

distributed.scheduler.KilledWorker for a long time series #236

Open chuaxr opened 6 years ago

chuaxr commented 6 years ago

I currently wish to perform averages of model variables (say condensates) on a 100-days worth of data (~100x100x50 at hourly resolution) in 10 day blocks for a base experiment, as well as 10 day blocks for different experiments. Each 10 day chunk is in its own file.

When attempting to perform an average over 6 variables on the 10 files in the 100 day run using parallelize = True, I obtain the following error:

Traceback (most recent call last):
  File "aospy_main.py", line 180, in <module>
    calcs = submit_mult_calcs(calc_suite_specs, calc_exec_options)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/aospy/automate.py", line 460, in submit_mult_calcs
    return _exec_calcs(calcs, **exec_options)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/aospy/automate.py", line 306, in _exec_calcs
    return _submit_calcs_on_client(calcs, client, func)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/aospy/automate.py", line 265, in _submit_calcs_on_client
    return db.from_sequence(calcs).map(func).compute()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/dask/base.py", line 99, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/dask/base.py", line 206, in compute
    results = get(dsk, keys, **kwargs)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1949, in get
    results = self.gather(packed, asynchronous=asynchronous)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1389, in gather
    asynchronous=asynchronous)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 559, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 244, in sync
    six.reraise(*error[0])
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 232, in f
    result[0] = yield make_coro()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1267, in _gather
    traceback)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/six.py", line 686, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('from_sequence-map-func-fac6754eb0ce32af9f4fd215e21f6b8f', 29)", 'tcp://127.0.0.1:44402')

The calculation completes successfully if I attempt to average the same 6 variables with files from 10 different experiments. The calculation also completes if I simply average 1 variable over the 10 files in the base run. That said, averaging 1 variable over the 10 files in the base run takes longer (about 10 minutes) than averaging the 6 variables with the files from the 10 different experiments (about 2 minutes). Perhaps the files from the same experiment are being combined into one long time-series behind the scenes and therefore leading to a memory error.

Is there a better workaround than to simply submit calculations for each variable separately?

spencerahill commented 6 years ago

Thanks for the report. Frankly I don't have a good intuition of what's going on here, but (as usual) I suspect @spencerkclark will.

spencerkclark commented 6 years ago

Perhaps the files from the same experiment are being combined into one long time-series behind the scenes and therefore leading to a memory error.

Yes, aospy will load all the data required for doing a calculation into memory (so in your case for the control simulation that uses 10 files, it will load the data from all ten). When you run things in parallel, it will load the data for all the calculations concurrently (so if you are computing 6 variables, effectively you will be loading data from 6 x 10 = 60 files into memory at once, which is likely triggering a memory error).

I think there is a workaround. As part of #172, we added an option of specifying an external distributed client in the main script (i.e. you can start your own cluster with settings to your liking and tell aospy to use that to manage your calculations, rather than have aospy create a cluster for you). You could create this cluster in a number of different ways (e.g. you could launch workers through batch jobs), but I'll describe how to do things from the command line, because I think it makes it clearer what's going on underneath.

  1. From the command line launch a dask scheduler on a node of the analysis cluster:

    $ dask-scheduler --scheduler-file /home/$USER/scheduler.json
  2. Launch a worker on a node of the analysis cluster through the command line:

    $ dask-worker --scheduler-file /home/$USER/scheduler.json --nprocs 3

    Here I'm specifying only 3 processes (this is the maximum number of concurrent calculations that will be done using this worker; I'm decreasing it from 6 and you might need to decrease it more depending on the size of your files).

  3. Once these are running, in your main script in the calc_exec_options dictionary, you can add an entry for a client:

    
    import os

from distributed import Client

scheduler_file = os.path.join('/home', os.environ['USER'], 'scheduler.json')

calc_exec_options = dict( prompt_verify=True, parallelize=True, client=Client(scheduler_file=scheduler_file), write_to_tar=True, )

4. Run your main script from the command line, and it should make use of the client and cluster you configured yourself:

$ python main.py


**Note:** If you want to do more calculations concurrently (even with the large set of files), you could launch more workers on separate analysis nodes using the same command used above (this will give you more memory to play with).  In other words repeat step (2) on a few separate nodes.

@chuaxr let me know if that makes sense.
chuaxr commented 6 years ago

Thanks @spencerahill and @spencerkclark for the replies.

@spencerkclark: I didn't have a /home/$USER/scheduler.json, so I copied what was in /home/skc/scheduler.json to /home/xrc. Running step 1 shows the following messages, but nothing else, even after waiting around 10 minutes. Is there something else that needs to be specified in the file?

(py361) ~/WRF/aospy_code$ dask-scheduler --scheduler-file /home/$USER/scheduler.json
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at: tcp://140.208.147.163:8786
distributed.scheduler - INFO -        http at:                     :9786
distributed.scheduler - INFO -       bokeh at:                     :8787
distributed.scheduler - INFO - Local Directory: /vftmp/Xin.Rong.Chua/pid6299/scheduler-z_igp3ck
distributed.scheduler - INFO - -----------------------------------------------

I'm also trying to figure out why specifying the workers helps in this case. Before, it was trying to load 10 files x 6 variables into memory. Does specifying only 3 processes mean that it will try to load 10 files x 3 variables instead?

spencerkclark commented 6 years ago

@chuaxr sorry that I wasn’t clearer in my instructions. I’m away from the computer right now, so I can’t write a longer message, but two quick things:

  1. Running $ dask-scheduler ... creates a scheduler file so there is no need to have an existing one (if you have one in the location specified it will just overwrite it)
  2. The output you got there is exactly what I would expect. To launch the workers you’ll need to open other terminal windows — the workers know how to talk to the scheduler through the instructions given in the scheduler file. As the workers connect to the scheduler you’ll see messages appear in the scheduler’s log.

And yes your understanding of why I believe this should help is correct. Let me know if you’re able to make more progress now. Thanks!

spencerahill commented 6 years ago

(as usual) I suspect @spencerkclark will

suspicions confirmed 😄

chuaxr commented 6 years ago

@spencerkclark Thanks for the clarifications. I opened separate windows for the scheduler, workers, and the window for sending the script. The following error message appeared, after which nothing appeared to happen (I killed the job after about 5 minutes).

wp25aug is the function I use to do pre-processing in WRF_preprocess.py.


Traceback (most recent call last):
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/core.py", line 277, in handle_comm
    result = yield result
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/worker.py", line 1163, in compute_stream
    msgs = yield comm.read()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/comm/tcp.py", line 187, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/tornado/gen.py", line 307, in wrapper
    yielded = next(result)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/protocol/core.py", line 122, in loads
    value = _deserialize(head, fs)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 23, in <lambda>
    deserializers = {None: lambda header, frames: pickle.loads(b''.join(frames))}
  File "/nbhome/xrc/anaconda2/envs/py361/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
AttributeError: Can't get attribute 'wp25aug' on <module 'wrf_preprocess' from '/home/xrc/WRF/wrf_preprocess.py'>
spencerkclark commented 6 years ago

@chuaxr where is the wp25aug function defined? I looked in /home/xrc/WRF/wrf_preprocess.py, but it isn't there.

chuaxr commented 6 years ago

@spencerkclark Sorry about that. I had a version of wrf_preprocess in /home/xrc/WRF (which is on my python path) and another version in the directory (/home/xrc/WRF/aospy_code) where I normally send my aospy_main scripts from (which has the wp25aug function). I'm surprised that it would look there instead of in the aospy_code, as it does if I just run the script without using the scheduler. Anyway, copying wrf_preprocess to /home/xrc/WRF resolved that issue.

The same KilledWorker error occurred with 3 processes, with many warnings like those below. Presumably this was the memory issue again, so I tried again with 1 process. I still received a bunch of warnings like the following:


distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 392.55 GB -- Worker memory limit: 541.50 GB

distributed.core - WARNING - Event loop was unresponsive for 1.44s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The good news is that the script ran without error and the files were produced. I also saw distributed.scheduler - INFO - Close client connection: Client-592109f0-cd98-11e7-a7b5-1866da7a455e in the scheduler window, so I figured it was done even though the worker window was still showing distributed.core - WARNING - Event loop was unresponsive for 1.44s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The bad news is that the process took around 90 minutes, which is probably longer than running things in separate scripts.

Ideally, there would be a way to let aospy know that it only needs one data file for each of the calculations. Or perhaps submitting as a batch job would allow for a larger memory limit?

spencerkclark commented 6 years ago

When attempting to perform an average over 6 variables on the 10 files in the 100 day run using parallelize = True, I obtain the following error:

Ideally, there would be a way to let aospy know that it only needs one data file for each of the calculations.

Sorry, I guess I was a little confused; I thought you wanted to use data from all 10 files? In theory, we've set things up so that loading of data should be lazy (in other words if you only need data from a subset of the files, it will only load data from those files into memory).

My impression was that:

Was that wrong?

Or perhaps submitting as a batch job would allow for a larger memory limit?

In this case if you open another terminal window, to a different node on the analysis cluster, and launch another worker, it will have the same effect. For example, if you launched workers on an101 and an200 you would have 1024 GB at your disposal.

chuaxr commented 6 years ago

@spencerkclark re our offline conversation:

Creating 10 separate base runs so that each calculation only loads the data file it requires instead of all
10 does resolve the issue (i.e. the calculation I described in the beginning finished in minutes instead of crashing). This is consistent with the issue being due to non-lazy loading of the time-varying coordinate used in WRF.

In the process of creating the runs programmatically, it seems that I needed to create variables programmatically as well. I ended up doing the following, where RCE_base_runs is a list of aospy.Run objects:

for x in range(0,num_base_runs):
    globals()['RCE_base_100day'+str(x+1)] = RCE_base_runs[x]

Is setting variables via globals() a safe thing to do?

spencerkclark commented 6 years ago

@chuaxr could you give a little more context as to why using a list of Run objects did not suffice? In other words, what was your use case for moving the contents of RCE_base_runs into the global namespace?

spencerkclark commented 6 years ago

This is consistent with the issue being due to non-lazy loading of the time-varying coordinate used in WRF.

Exactly -- this was my hypothesis, because xr.open_mfdataset uses coords='different' by default when trying to concatenate the datasets loaded in from the specified files. See the following excerpt from the xarray documentation for concat:

‘different’: Coordinates which are not equal (ignoring attributes) across all datasets are also concatenated (as well as all for which dimension already appears). Beware: this option may load the data payload of coordinate variables into memory if they are not already loaded.

It turns out this behavior can be modified in xarray version 0.10.0 by using a keyword argument in xr.open_mfdataset. It might help in #240 if I extended things to give the user the option to specify the data_vars and coords arguments in their DataLoaders. This way you might be able to solve this instead by specifying coords='minimal'.

chuaxr commented 6 years ago

@spencerkclark

spencerkclark commented 6 years ago

In my variables file (imported as lib in aospy_main), I have a list of Runs called RCE_base_runs. Each of the runs is named RCE_base_runs1, RCE_base_runs2, etc . In the aospy_main script, I can specify runs=lib.RCE_base_runs for all of the runs.

Perfect 👍

Re the key word argument in xr.open_mfdataset: would the difference be that under minimal, it would load the coordinates for only the required variables, whereas under different, it would load the coordinates once for each variable in the dataset?

Yes, I believe in that case it would just load the 1D index coordinates (which are much smaller) and infer how the multi-dimensional coordinates could be concatenated based on those.

spencerahill commented 6 years ago

@chuaxr FYI we are eventually going to create an ObjLib class that will make this discovery/access of one's Proj, Model, Run, etc. objects much more consistent and straightforward: see #163

But I don't have an estimate of even when we'll start on that in earnest. Hopefully early 2018 but no guarantee 😦

chuaxr commented 6 years ago

@spencerahill thanks for the note. At the moment, #208 is a larger issue for me-- I'm using a lot of conditional sampling (e.g. vertical velocity corresponding to the 99.99th percentile of precipitation or precipitation exceeding 500 mm/day). Having to write separate functions for each variable I wish to perform this operation over, not to mention having to hard-code the thresholds, is getting to be rather unwieldy. If the amount of effort required would be similar, then I would appreciate it if it were focused on #208 rather than on this issue.

spencerkclark commented 6 years ago

Don't worry @chuaxr, #208 is a big deal for me too. Because of that limitation over the last few weeks I've been doing a bunch of calculations outside of aospy entirely; I hope sometime early next year I'll be able to sit down and think through how to best adapt aospy for our more general needs.

spencerahill commented 6 years ago

If the amount of effort required would be similar, then I would appreciate it if it were focused on #208 rather than on this issue.

@chuaxr for sure, this is definitely our top priority moving forward. Like @spencerkclark , I likely won't have time to see it through before the new year, but it will be the first thing we tackle in 2018. Thanks for your patience!