Closed bouweandela closed 5 years ago
I am having a lot of trouble with PRIMAVERA outputs due to its huge size. I think we can manage this better with Iris 2 and dask, so I am gonna try to make the switch.
Let me know how you're doing, so we don't do double work.
I will upload the branch and update it so you can see what I am up to.
For now, it's just updating iris and adding an option in the config file to pass it a scheduler.json information. Bad news is that I am having a pickle error from dask that I don't know what really means
This is the error. Any idea, @bjlittle?
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2343, in save
fill_value=fill_value)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 977, in write
fill_value=fill_value)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2013, in _create_cf_data_variable
fill_value_to_check)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 1990, in store
da.store([data], [target])
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/array/core.py", line 949, in store
result.compute(**kwargs)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/base.py", line 154, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/base.py", line 407, in compute
results = get(dsk, keys, **kwargs)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 2098, in get
direct=direct)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 1508, in gather
asynchronous=asynchronous)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 615, in sync
return sync(self.loop, func, *args, **kwargs)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/utils.py", line 253, in sync
six.reraise(*error[0])
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
result[0] = yield make_coro()
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
yielded = self.gen.throw(*exc_info)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 1385, in _gather
traceback)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/six.py", line 692, in reraise
raise value.with_traceback(tb)
File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
return pickle.loads(x)
EOFError: Ran out of input
Just to comment that I have problems when trying to use dask.distributed. No problems arise if if don't try to connect to a dask scheduler
See the issue in iris portal https://github.com/SciTools/iris/issues/3157
By the way, we get no performance gain using the threaded scheduler. See the GIL in all its glory:
Nice picture, what tool did you use to make it? Are you using dask arrays with that example? Or just our preprocessor? Because in the current preprocessor we convert everything to numpy arrays in many functions, so I wouldn't expect much parallelism there.
Memory issues can be expected when switching to Iris 2, as reported for example in #677.
howdy chaps, I am starting to have a look at this again and will be running some tests with iris 2.2.0
- are there any updates on this so I don't start reinventing the wheel?
also, just to keep track of my previous tests and very informed comments from @bjlittle @jvegasbsc and @pelson, am linking the previous issue: https://github.com/ESMValGroup/ESMValTool/issues/318
@bjlittle Happy New Year, man!! Hope things are good with you and you had a good break :beer:
New year, new testing and some interesting results right here: I finally caught the gist of lazy and realized data in the new iris and I thought about testing iris 2.2.0
against iris 1.13
for a few cases. Basically, my little test case looks like this:
# apply possible fix to dask
import dask
# this id deprecated
# dask.config.set(get=dask.get)
# optionality:
# x.compute(scheduler='single-threaded')
# x.compute(scheduler='threads')
# x.compute(scheduler='processes')
# loop through options
#dask.config.set(scheduler='single-threaded')
#dask.config.set(scheduler='threads')
dask.config.set(scheduler='processes')
import iris
import time
#print(iris.__version__)
c1 = iris.load_cube('data1.nc')
c2 = iris.load_cube('data2.nc')
# lazy/realized data ops
t1 = time.time()
#res = (c1 - c2) * (c1 * c2 / (c1 + c2))**2.
res = (c1.data - c2.data) * (c1.data * c2.data / (c1.data + c2.data))**2.
t2 = time.time()
print('op time', t2 - t1)
# saving: did once when using lazy data only
#iris.save(res, 'res.nc')
#t3 = time.time()
#print('save time', t3 - t2)
where I ran the lazy (no cube.data
but just cube
in operation) and realized (cube.data
in operation) and for iris 2.2.0
I ran with no dask
settings and then with different dask
settings. For review purposes the test lives on Jasmin at /home/users/valeriu/esmvaltool_dask_test
. I tested only timing and no memory so far; timing has been averaged over 20 iterations of the same code (just the operation and save); my numpy
versions are identical for both cases 1.15.1
; dask
is 1.0
. Here are my conclusions and attached plots:
dask.config.set()
settings are applied;dask.config.set(scheduler='single-threaded')
;dask.config.set(scheduler='threads')
;dask.config.set(scheduler='processes')
;So, as @pelson pointed out in the previous (closed) issue, different settings of the dask
configuration do indeed influence performnce by quite a bit.
Any thoughts on this?
@jvegasbsc @bouweandela @mattiarighi
OK guys, I ran some more tests: bad news twice: once, because scheduler: processes
does not work with ESMValTool's multithreading (daemonic tasks can not have children
, sounds quite ominous :grin: ) and twice, because a comparison run of a fairly involved preprocessor (see below) with 8 threads clearly shows that iris2+dask is slower than iris1.13:
Preprocessor:
pptest:
extract_levels:
levels: 85000
scheme: linear
regrid:
target_grid: 1x1
scheme: linear
mask_fillvalues:
threshold_fraction: 0.95
multi_model_statistics:
span: overlap
statistics: [mean, median]
run it 10x to get some statistics, max_parallel_tasks=8
:
iris 1.13: 13.6 +/- 1.4 s per run
iris 2.2.0 default dask: 25.8 +/- 2.8 s per run
iris 2.2.0 scheduler:
threads and multiprocessing-method: spawn: 27.2 +/- 2.3 s per run
The way I tested this was by editing the configuration file of dask in ~/.config/dask/distributed.yml
; I will next try explicitly importing dask and setting the scheduler to: processes in each of the preprocessor modules. Will report back
bummer, found out that the default dask.config.config(scheduler)
is indeed threads
so progress equals zero; scheduler=processes
offers improvement only if the data is realized everywhere and we use only a single thread so it's not usable for our purposes. I will actually go on now and look at the preprocessor modules and see where and when we can switch from calls to realized to lazy data; It would really help if the iris guys had a best compromise configuration for dask so we can just use that configuration file rather than tweak dask on the fly - @bjlittle ?
@valeriupredoi Here are some comments on your results:
From the large variability in runtimes it looks like the system you are using to test is quite overloaded with tasks, which will cause tasks to be executed much slower than necessary.
The mean is not representative of the actual runtime, the shortest runtime is more likely to be the correct time. This is because your computer is busy doing other stuff if it takes longer.
saving a cube is about 3 times slower in iris2.2.0 than in iris1.13 (WTF?);
From looking at your script, it looks like the data is only realized (i.e. loaded from file and computed) at the save step. So that explains why this takes longer.
I would recommend using actual preprocessor functions for testing instead of synthetic benchmarks, as this gives much more relevant results on which code may be slow(er)/need to be optimized.
Finally, I recommend switching to Iris 2 as soon as possible even if this incurs some slowdown. I believe many people are already using Iris 2, given all switches we have in the code for Iris version selection. We can then start working on making the preprocessor faster/more memory efficient or if we need iris support for that, file an issue with iris. Sticking with Iris 1 is not a real option in the long run, because it will not be further developed. Especially with the upcoming large CMIP6 datasets, I expect that those will be much easier to handle with Iris 2/dask than with Iris 1.
Finally, I recommend switching to Iris 2 as soon as possible even if this incurs some slowdown.
I agree with you. We can not expect to fix all drawbacks coming from the switch before switching: it is just too much work to be done. Also, if we keep using iris 1.13 we just keep adding more stuff that we will need to optimize / fix on the future
same, I think we need to switch to iris 2.2.0
asap - in fact, I have been running all my stuff with it for the past week or so. @bouweandela cheers for the detailed analysis, man, I agree with your points bar the one that says the tests should probably be done on a pristine node - I think the real-life experience (eg a busy node on Jasmin, like my results have been done on) is more useful to get an idea of the differences - indeed one introduces a lot of noise due to the use of the node by other users but we assume that noise to have the same distribution everytime you run stuff enough multiple times (assuming a Poisson statistic, probably the best representation for it ie the more tests you do the closer the mean is to the true value). Actually, comparing min times as you suggest and mean times they are almost exactly at a factor of 2-3 slower with iris2. Before we start thinking how to not realize data in the Preproc modules I think it'd be very useful if we could possibly find a more optimal set of config settings in .config/dask/distributed.yml
which is currently empty with default dask. That'd be the no brainer solution that involves the least amount of work :grin:
hey guys, this thing actually bugs me so I ran a few more things: I have followed @bouweandela suggestion to run on a relatively clean node and use actual ESMValTool examples so I did this on my laptop using a very basic recipe that does a regrid preprocessor and a zonal mean for a diagnostic:
Single-machine testing with iris
--------------------------------
Single-machine running: my laptop with no other
transitory process running in the background.
iris 1.13.0
max_parallel_task = 8
0:00:17.270246
0:00:17.195842
0:00:16.610380
0:00:17.019490
0:00:16.886401
iris 2.2.0
max_parallel_task = 8
0:00:20.693597
0:00:20.592311
0:00:21.633979
0:00:20.578326
0:00:20.793864
so expectedly numbers are stable and it looks like the bad wolf is not so bad after all - we're taking a hit of only ~25% in time. I also found the specific iris module that deals with the handling of dask or numpy arrays (lazy/realised data):
lazy/realised data set in _lazy_data.py in iris:
anaconda3/envs/esmvaltool/lib/python3.6/site-packages/iris/_lazy_data.py
chunks are set there based on data shape and a MAX chunks parameter similar to what biggus used to have. We can actually play with that eg:
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2 # default
_MAX_CHUNK_SIZE = 4 * 512 * 512 * 2 # not improving
_MAX_CHUNK_SIZE = 16 * 2048 * 2048 * 4 # not improving
_MAX_CHUNK_SIZE = 32 * 4096 * 4096 * 8 # not improving
_MAX_CHUNK_SIZE = 512 # freaking slow
_MAX_CHUNK_SIZE = 80 * 10240 * 10240 * 20 # not improving
It appears that any change of this constant is not affecting run time unless it's very small ie 16 or 512 which is pretty straightforward
we can also play with the actual chunks as well:
chunks = tuple(int(i/10) for i in data.shape) # slows by factor 4
chunks = tuple(int(i*10) for i in data.shape) # no improvement
chunks = tuple(int(i/2) for i in data.shape) # slows by 5-10%
chunks = tuple(int(i*4) for i in data.shape) # no improvement
chunks = tuple(int(i*100) for i in data.shape) # no improvement
chunks = tuple(2 for i in data.shape) # superslow
chunks = tuple(512 for i in data.shape) # no improvement
and finally try to set a client up:
iris 2.2.0
set in _lazy_data:
from dask.distributed import Client
client = Client(processes=False)
max_parallel_task = 1
(max_parallel_task > 1 stalls completely; I think workers are not talking to each other!) about 30% slower than if we didn't create the Client
so it looks like, in terms of looking at the iris code as a possible place to gain in performance it's not gonna happen ie the iris guys have done everything right . I'd reckon that we have to look into not realizing so much of the cubes' data in our scripts. I am also wondering if setting workers properly up would be a good idea, but I don't know yet how to do that - @jvegasbsc has done some tests I believe. Another thing I don't know is when, for instance, we perform cube collapses (we do a lot of them) - will this happen with automatic data realization?
OK I got some good news and onbiously some bad news: I ran this recipe through both iris 1 and 2 on my laptop so we don't get any node traffic from elsewhere and with single-thread so we can measure the behavior correctly:
datasets:
- {dataset: MPI-ESM-LR, project: CMIP5, mip: Amon, exp: historical, ensemble: r1i1p1, start_year: 1950, end_year: 2000}
- {dataset: NorESM1-M, project: CMIP5, mip: Amon, exp: historical, ensemble: r1i1p1, start_year: 1950, end_year: 2000}
preprocessors:
simple_preproc:
regrid:
target_grid: 1x1
scheme: linear
multi_model_statistics:
span: overlap
statistics: [mean,]
diagnostics:
validation_basic:
description: "zonal means"
variables:
tas: # simple wee variable
preprocessor: simple_preproc
field: T2Ms
scripts: Null
and see the memory=f(absolute time) plots attached
(the black vertical lines are start times of fix_data, regrid and multimodel modules) The good news:
The bad news:
Note that I didn't apply any tweaks to iris 2's dask (out-the-box)
I will create a branch where we can test this further but I have limited data on my laptop and would not want to perform these tests on Jasmin - as @bouweandela mentioned, we should run these on a free node.
How much work would it be to switch to dask arrays in the preprocessor? Can we expect this to improve the performance?
no need to do that - we just don't have to call cube.data
as many times as we are now, if we work with cube
only structures, iris will automatically work with dask (lazy) arrays and do all the operations on them rather than on numpy arrays; of course this would be the ideal case only, we'll have to call cube.data
at some point and when that happens iris switches from dask to numpy, the question is how many times we really need to do this
here is the bit of iris documentation that tells us about lazy and realized data: https://scitools.org.uk/iris/docs/latest/userguide/real_and_lazy_data.html
what puzzles me still is why such modules like multimodel
that in ESMValTool rely almost 100% on realized data operations are so much slower in iris 2 than 1, given that, since all operations are on numpy arrays and numpy hasn't changed, we still take a decent hit in run time....
How much work would it be to switch to dask arrays in the preprocessor? Can we expect this to improve the performance?
@jvegasbsc do you think this would be possible with a benefit on performance in modules like fix_data
or mask
that actually rely on direct access of individual data values?
@jvegasbsc do you think this would be possible with a benefit on performance in modules like fix_data or mask that actually rely on direct access of individual data values?
I think that we should update them to become lazy operations on Iris cubes. Unfortunately, I have not seen any examples on Iris documentation about how to do it
with the fix from https://github.com/ESMValGroup/ESMValTool/pull/816 things look better and smoother: same test case as above, same comparison, only the bugfix from _multimodel
in
:grin:
the time delta between the two vertical lines is the time for regrid, past the rightmost vertical line is multimodel, check data and save
have created us a branch for dask purposes (ie performance improvements): https://github.com/ESMValGroup/ESMValTool/tree/version2_dask - no PR just yet but note that I have already change multimode to reflect #816 and have started putting dask arrays in it, also recipes/examples/recipe_dask.yml is the test case above. Hopefully we'll be able to find the golden egg and improve something in this branch and not delete it in misery in a couple months :grin:
At the moment we are still using iris 1.13 because of issue #318. We would like to switch to iris 2 because the expectation is that iris 1.13 will not be maintained indefinitely.