ocean-transport / scale-aware-air-sea

Repo for collaborative project on scale-aware air-sea fluxes
1 stars 0 forks source link

aerobulk-python performance #28

Open jbusecke opened 2 years ago

jbusecke commented 2 years ago

I am planning to make this a longstanding issue to evaluate and discuss the performance of our package.

I have investigated some aspects that came up in the past, and will summarize them below. The full workflow can be found in this notebook.

For now I only looked at the in memory numpy performance. We might want to extend the discussion to parallel performance (e.g. here) later.

How does the computation time scales with the number of time iterations

image From this plot I draw two relevant conclusions for our work:

  1. The setup 'init' call does not add significant overhead, and so we do not have to worry too much about applying the calculation over single time slices!
  2. There is a slight benefit of iterating over many time steps within the fortran routine (slope is slightly smaller than 1), but I think for now we should not worry about this too much.

How the order of dimensions (e.g. longest to shortest or vice versa) influences the exection time of aerobulk python

For this one I looped through a buch of different array shapes (all with a total of 1e6 elements) and then plotted the execution time: image

The results honestly confuse me a bit. I would have expected the arrays with the smallest number at the end (...,...,1) to clearly outperform the others, but that does not seem the case. In fact is seems that in the big picture it matters more whether an array is elongated rather than square. Maybe @rabernat has some more insight here.

How does the computation time scale with niter

For this I simply timed the computation for the same array shape but with increasing values for niter image

So from my POV, we should focus on finding a minimal value for niter that still has 'converged' (#27) for now, since that seems to have the largest impact on practical performance?

I will work on that aspect next.

jbusecke commented 2 years ago

See https://github.com/ocean-transport/scale-aware-air-sea/issues/27#issuecomment-1149242444 for some work on the 'convergence' aspect.

jbusecke commented 2 years ago

As suggested in https://github.com/xgcm/aerobulk-python/issues/32#issuecomment-1158031412 I checked the scaling of execution time with domain size:

import numpy as np
import matplotlib.pyplot as plt

multipliers = [1, 5, 10, 25, 100, 1000, 5000]
times_noskin = []
times_skin = []
elements = []
for m in multipliers:
    print(m)
    shape = (m, m, 1)

    noskinargs = create_data(shape, chunks=None)
    tic = time.time()
    noskin(*noskinargs)
    toc = time.time() - tic
    times_noskin.append(toc)

    skinargs = create_data(shape, chunks=None, skin_correction=True)
    tic = time.time()
    skin(*skinargs)
    toc = time.time() - tic
    times_skin.append(toc)

    elements.append(m**2)

plt.plot(elements, times_noskin, label='noskin')
plt.plot(elements, times_skin, label='skin')
plt.xlabel('elements in array')
plt.ylabel('execution time [s]')
plt.legend()

image

This seems like it scales very linearly indeed.

jbusecke commented 2 years ago

I have further investigated how aerobulk python scales when parallelizing with dask:

To test this I wanted to see how it behaves under 'soft scaling'. This means that we increase the problem size (here timesteps) of the problem proportionally to the number of workers, and under ideal circumstances the execution time stays exactly the same.

I tried this:

results = {}
cores = range(1,17, 2)#[1, 7, 15]#range(2) # We got 16 cores on the 'huge' option

for n in [10, 100]:#[10, 50, 100]:
    print('setup')
    cluster = LocalCluster(n_workers=0, threads_per_worker=1)
    client = Client(cluster)

    nx = ny = n

    times = []

    for i in cores:
        print(i)
        shape = (nx, ny, i)
        args = create_data(shape, chunks={'dim_2':1})
        # set up dask
        cluster.scale(i)
        client.wait_for_workers(i)

        # multipe passes to get rid of random variability
        single_time = []
        for p in range(1,8):
            print(f'executing calculation Pass:{p}')
            tic = time.time()
            out_args = noskin(*args)
            # compute outputs
            out_args[0].load() # this should be enough to call the computation?
            toc = time.time()-tic
            single_time.append(toc)
        times.append(np.median(single_time))

    results[n] = times
    # teardown dask
    print('teardown')
    client.close()
    cluster.close()

and for the results I am getting:

for n, times in results.items():
    normalized_times = [t/times[0] for t in times]
    plt.plot(cores, normalized_times, label=n)
plt.legend()

image

These results are consistent across different horizontal sizes of problems (e.g. here 10x10 and 100x100), and they are normalized to the runtime with 1 core.

I do not have much experience with these benchmarks, but this seems pretty bad to me? It means that for a 16x increase in workers we only get a 5.3x (16/3) speedup?

@rabernat what do you think about this test setup and the results?

jbusecke commented 2 years ago

I wonder how much influence the gigantic flood of print messages has on this? These sort of messages are printed for every chunk.

jbusecke commented 2 years ago

Out of curiosity I just reran this including a larger horizontal domain (500x500 and 1000x1000): image and these seem to scale better for small core numbers (4-8) but then show a similarly bad scaling for larger core numbers.

rabernat commented 2 years ago

and these seem to scale better for small core numbers (4-8) but then show a similarly bad scaling for larger core numbers.

This looks pretty similar to https://github.com/ocean-eddy-cpt/gcm-filters/issues/45#issuecomment-865008936

Over there, we diagnosed the culprit in poor scaling as related to memory allocation. However, that should really not be an issue with the Fortran.

Here's one idea. Since we ended up not trying to implement error handling, you should be able to add the threadsafe directive to the .pyf files. This might affect performance somehow, even though you are using the distributed scheduler. It's worth a try.

rabernat commented 2 years ago

Two more thoughts after discussing this today with @paigem and @cmdupuis3

paigem commented 2 years ago

I took a stab today at using snakeviz to better understand where aerobulk-python is spending most of its time. Since I'm new to snakeviz and profiling in general, I have limited results to show, but wanted to share what I've done so far and hopefully those more knowledgeable (@jbusecke and @rabernat) can steer us in the right direction.

Some baseline sanity checks that I would expect (thanks @TomNicholas for the quick chat on these):

Snakeviz on the cloud did not work

I was able to install on Pangeo Cloud via pip install snakeviz, but the visualization didn't show up in the notebook. Wasn't sure how to easily trouble shoot, so I opted to use my local computer for now.

Snakeviz locally

See the notebook in this PR.

I used the create_data() function to create some synthetic data of different sizes using @jbusecke's code from this comment .

data = create_data((nx,ny, 2), chunks=None)

%load_ext snakeviz

%snakeviz out_data = noskin(*data)

CM2.6-sized input array:

image

Very small (10,10,2) input array:

image

Is Fortran code taking most of runtime?

We can see that noflux_np() is taking over 99% of the runtime for the large input arrays, and it is in this noflux_np() that the Fortran code is called. So, for the larger sized input data, it does indeed appear that the Fortran code is taking over 99% of the runtime.

With a very small (10,10,2) input array, noflux_np() takes only 15% of the runtime. This is not a surprise since it runs so quickly, and it on part with the other python wrapper overhead.

Runtime ~constant for python wrapper?

The runtime that noflux_np() takes up increases with the size of the input arrays, but not by a huge amount. I can quantify this if desired.

I'm not sure how useful this is yet, but eager to hear from others on how to better interpret these visualizations!

jbusecke commented 2 years ago

You should repeat the scaling test with a dask-gateway cluster. This will test whether contention between processes on the same system is hurting scaling.

I just repeated (part of the scaling test) with a gateway cluster (installing this on the fly is extremely slow):

image

It seems that problem persists unfortunately. I will give adding the 'threadsafe' a shot now.

rabernat commented 2 years ago

As a baseline, you should look at the scaling of some other built in operation, like just taking the mean or something. These results might not be as bad as you think.

Also, use a larger problem size.

jbusecke commented 2 years ago

Good suggestions. Ill try that.

In the meantime the 'threadsafe' addition shows some promising results when running on the threaded scheduler: https://github.com/xgcm/aerobulk-python/pull/34#issuecomment-1162285760

jbusecke commented 2 years ago

you should look at the scaling of some other built in operation, like just taking the mean or something.

Still thinking about this: In order to compare this in a fair manner, I would have to do some operation that pulls together the same variables, does something with them and puts them out, right?

Something like:

def dummy(sst, t_zt, hum_zt, u_zu, v_zu, slp):
    return sst+t_zt+hum_zt+u_zu+v_zu+slp

or should i take the mean of each variable separately?

jbusecke commented 2 years ago

Ok so I have been really frustrated with my attempts to test the scaling with dask gateway.

This is what I am trying:

from typing import Dict, Tuple
import time
import numpy as np
import pytest
import xarray as xr
from aerobulk import noskin, skin

"""Tests for the xarray wrapper"""

def create_data(
    shape: Tuple[int, ...],
    chunks: Dict[str, int] = {},
    skin_correction: bool = False,
    order: str = "F",
):
    def _arr(value, chunks, order):
        arr = xr.DataArray(np.full(shape, value, order=order))

        # adds random noise scaled by a percentage of the value
        randomize_factor = 0.001
        randomize_range = value * randomize_factor
        arr = arr + np.random.rand(*shape) + randomize_range
        if chunks:
            arr = arr.chunk(chunks)
        return arr

    sst = _arr(290.0, chunks, order)
    t_zt = _arr(280.0, chunks, order)
    hum_zt = _arr(0.001, chunks, order)
    u_zu = _arr(1.0, chunks, order)
    v_zu = _arr(-1.0, chunks, order)
    slp = _arr(101000.0, chunks, order)
    rad_sw = _arr(0.000001, chunks, order)
    rad_lw = _arr(350, chunks, order)
    if skin_correction:
        return sst, t_zt, hum_zt, u_zu, v_zu, rad_sw, rad_lw, slp
    else:
        return sst, t_zt, hum_zt, u_zu, v_zu, slp

from dask_gateway import Gateway
gateway = Gateway()
options = gateway.cluster_options()
options.worker_cores = 1
options.worker_memory = 8

# stop all existing clusters
print(gateway.list_clusters())
for c in gateway.list_clusters():
    cluster = gateway.connect(c.name)
    cluster.shutdown()

def dummy_wrapper(a, b, c, d,e,f):
    return (a+b+c+d+e+f).mean(['dim_0', 'dim_1'])

results = {}
results_dummy = {}
cores = [1, 2, 4, 8]#, 8, 16 4, 8, We can get up to 30? workers on dask gateway?

for n in [1500]:#[10, 50, 100, 100, , 1000]:

    nx = ny = n
    results[n] = []
    results_dummy[n] = []

    for i in cores:
        print(f'set up gateway from scratch for {i} workers')
        cluster = gateway.new_cluster(options, shutdown_on_close=True)
        client = cluster.get_client()
        print('https://us-central1-b.gcp.pangeo.io/'+client.dashboard_link)
        plugin = MambaPlugin(['aerobulk-python'])
        client.register_worker_plugin(plugin)
        cluster.scale(i)
        client.wait_for_workers(i)

        print(f"Start calculation for {i} workers")
        shape = (nx, ny, i*3)
        args = create_data(shape, chunks={'dim_2':1}) # this would be the time dimension that is chunked.
        # set up dask

        # multipe passes to get rid of random variability
        single_time = []
        single_dummy_time = []
        for p in range(5):
            print(f'executing calculation Pass:{p}')
            tic = time.time()
            out_args = noskin(*args)
            # compute outputs
            out_args[0].load() # this should be enough to call the computation?
            toc = time.time()-tic
            single_time.append(toc)

            # reference experiment
            tic = time.time()
            dummy = dummy_wrapper(*args)
            dummy.load()
            toc = time.time()-tic
            single_dummy_time.append(toc)

        results[n].append(np.median(single_time))
        results_dummy[n].append(np.median(single_dummy_time))
        cluster.shutdown()
        client.close()

At some point I always get this sort of error:

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Input In [11], in <cell line: 8>()
     35 out_args = noskin(*args)
     36 # compute outputs
---> 37 out_args[0].load() # this should be enough to call the computation?
     38 toc = time.time()-tic
     39 single_time.append(toc)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataarray.py:921, in DataArray.load(self, **kwargs)
    903 def load(self, **kwargs) -> DataArray:
    904     """Manually trigger loading of this array's data from disk or a
    905     remote source into memory and return this array.
    906 
   (...)
    919     dask.compute
    920     """
--> 921     ds = self._to_temp_dataset().load(**kwargs)
    922     new = self._from_temp_dataset(ds)
    923     self._variable = new._variable

File /srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py:861, in Dataset.load(self, **kwargs)
    858 import dask.array as da
    860 # evaluate all the dask arrays simultaneously
--> 861 evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    863 for k, data in zip(lazy_data, evaluated_data):
    864     self.variables[k].data = data

File /srv/conda/envs/notebook/lib/python3.9/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py:3014, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3012         should_rejoin = False
   3013 try:
-> 3014     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3015 finally:
   3016     for f in futures.values():

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py:2188, in Client.gather(self, futures, errors, direct, asynchronous)
   2186 else:
   2187     local_worker = None
-> 2188 return self.sync(
   2189     self._gather,
   2190     futures,
   2191     errors=errors,
   2192     direct=direct,
   2193     local_worker=local_worker,
   2194     asynchronous=asynchronous,
   2195 )

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py:320, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    318     return future
    319 else:
--> 320     return sync(
    321         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    322     )

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py:387, in sync(loop, func, callback_timeout, *args, **kwargs)
    385 if error:
    386     typ, exc, tb = error
--> 387     raise exc.with_traceback(tb)
    388 else:
    389     return result

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py:360, in sync.<locals>.f()
    358         future = asyncio.wait_for(future, callback_timeout)
    359     future = asyncio.ensure_future(future)
--> 360     result = yield future
    361 except Exception:
    362     error = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.9/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py:2052, in Client._gather(self, futures, errors, direct, local_worker)
   2050     else:
   2051         raise exception.with_traceback(traceback)
-> 2052     raise exc
   2053 if errors == "skip":
   2054     bad_keys.add(key)

CancelledError: ('transpose-4e8cd4ca577ddf75a0dbef30d3572dba', 0, 0, 7)

As far as I can tell it is not deterministic (sometimes happens at 4 cores, sometimes at e.g. 8). Is there anything stupid I am doing in my setup?