dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Longitudinal Benchmarks with asv #290

Open mrocklin opened 8 years ago

mrocklin commented 8 years ago

In conversation here it was suggested by @minrk and @ogrisel that we set up something like https://github.com/spacetelescope/asv to monitor performance of a few important computation patterns over time. This would help us to identify performance regressions as we change the scheduler.

mrocklin commented 8 years ago

I already have a bunch of benchmarks that I use when profiling performance. The goal of this issue should be first to set up sufficient infrastructure so that it is easy for others to add benchmarks in the future.

dhirschfeld commented 6 years ago

In case there's desire to have a real-world test case I'm posting below a very simple Monte-Carlo GBM simulation. I've got similar code running on a distributed cluster so have a vested-interest in making sure it runs efficiently! I'm hopeful that a real-world use-case such as this can inform future optimisation efforts (though I wonder if it's too simple to be of any use?)

multivariate_normal(covariance, ndates, nsims, nchunks) ```python def multivariate_normal(covariance, ndates, nsims, nchunks): """Returns correlated random normal variables according to the supplied ``covariance`` Parameters ---------- covariance : ndarray[float](ndims, ndims) The size of the covariance is determined by ``ndims`` - the number of processes being simulated ndates : int The size of the time axis for the simulation nsims : int The number of simulations for each time period nchunks : Tuple[int, int, int] The size of each chunk Returns ------- rvs : ndarray[float](ndims, ndates, nsims) The correlated rvs st ``np.cov(rvs.reshape(ndims, -1)) ~= covariance`` """ ndims = covariance.shape[0] U, s, Vh = np.linalg.svd(covariance) rvs = da.random.normal(size=(ndims, ndates, nsims), chunks=(ndims, 1+ndates//nchunks, nsims)) rvs -= rvs.mean(axis=-1)[..., None] rvs /= rvs.std(axis=-1)[..., None] rvs = da.dot(U @ np.diag(np.sqrt(s)), rvs.reshape(2, -1)).reshape(rvs.shape) return rvs ```
simulate_gbm(F, sigma, Te, rvs) ```python def simulate_gbm(F, sigma, Te, rvs): """Simulates a correlated, n-dimensional Geometric Brownian Motion process Parameters ---------- F : float The current (forward) price sigma : float The annualised volatility Te : ndarray[float](ndates,) The annualised time-to-expiry for each date (column of ``rvs``) rvs : ndarray[float](ndims, ndates, nsims) The correlated rvs Returns ------- sims : ndarray[float](ndims, ndates, nsims) The transformed ``rvs`` st each path ``sims[i, :, k]`` represents a realisation of a GBM process with variance ``covariance[i, i]`` """ variance = Te * sigma**2 mean = -0.5*variance dV = np.append(variance[0], np.diff(variance)) sims = rvs sims *= np.sqrt(dV)[None, :, None] sims.cumsum(axis=1, out=sims) sims += mean[None, :, None] np.exp(sims, out=sims) sims *= F return sims ```


>>> F = 50.
>>> sigma = 0.2
>>> covariance = np.array([
...     [1,  0.7],
...     [0.7,  1],
... ])
>>> Te = np.linspace(0, 3, 1 + 365*3)
>>> rvs =  multivariate_normal(covariance, Te.size, nsims=1e5, nchunks=60)
>>> sims = simulate_gbm(F, sigma, Te, rvs)
>>> sims
dask.array<mul, shape=(2, 1096, 100000), dtype=float64, chunksize=(2, 19, 100000)>
>>> %%time
... fut = cluster.compute(sims, optimize_graph=True, pure=False)
... wait(fut)
Wall time: 11.8 s
>>> %%time
... sims_ = sims.compute(scheduler='synchronous')
Wall time: 46.8 s