European-XFEL / karabo_data

Python tools to read and analyse data from European XFEL
https://karabo-data.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
13 stars 7 forks source link

Dask integration #212

Closed takluyver closed 5 years ago

takluyver commented 5 years ago

Dask seems like exactly what we want in many ways. It can represent data like a numpy array without loading it all into memory, and then coordinate parallel computations working with chunks of the data, so you can do arr.mean() and it will organise tasks to load each chunk, sum it, add up the results, and give you a mean. A plugin dask_jobqueue even makes it easy to spread across HPC clusters.

Unfortunately, my initial tests with it were not as good as I had hoped (experiments repo). The default settings (using multiple threads) were slower than doing the averaging without parallelism. Using dask.distributed with multiple processes on one node was better, but strangely variable, and produced a bunch of concerning warnings and errors. It wasn't maxing CPUs like other approaches I tried.

I haven't had time to dig into these issues in detail. I know dask has tools for visualising task graphs and computational progress, which hopefully can help shed some light. Two possible sources of issues:

[API question: get_dask_array(...)? get_array(..., dask=True)? get_array(..., format='dask')?]

takluyver commented 5 years ago

I think this is ready for review now.

My sample use case (averaging a lot of DSSC data) can still be done slightly faster with custom multiprocessing code than using a dask array. I think this is partly because with dask, the parent process has to open the files to work out the array shape, whereas you can avoid this by writing the parallel code by hand. But they're on a similar time scale, and using dask is very convenient - it's basically like using numpy plus some boilerplate.

takluyver commented 5 years ago

Also, I meant to explain: for now this returns a 'plain' dask array, without xarray labels. Xarray does have some support for dask, and I initially tried to return a labelled array, like .get_array(). But it seemed to be slower, and dask produced warnings about garbage collection taking a long time. I don't know if I was doing something wrong, or hitting a bug in one of those libraries, but ultimately I backed off the integration for now.

takluyver commented 5 years ago

I've added an example without the output for now. It was working as I was writing it, but when I came to run through it in order, the cluster was too busy for me to get a node.

tmichela commented 5 years ago

I tried to run the notebook on maxwell. it works but I could not open the dashboard page, I get: 500 : Internal Server Error.

Is there something I'm doing wrong?:

tmichela commented 5 years ago

Do you think it would be useful to also add a method for a get_dask_dataframe?

tmichela commented 5 years ago

I found this article from one of the authors of Dask: https://matthewrocklin.com/blog/work/2017/09/18/pangeo-1

There are some insights of the limitations in using Xarray with dask and benchmarking of dask on a HPC. I had a quick look in the documentation but is not entirely clear to me how much data is transferred between processes and if this can be a bottleneck...

antarcticrainforest commented 5 years ago

I tried to run the notebook on maxwell. it works but I could not open the dashboard page, I get: 500 : Internal Server Error.

Is there something I'm doing wrong?:

Did you ssh into the cluster and forwarded the port where the dask client is running?

takluyver commented 5 years ago

I didn't need any port forwarding, but it looks like I've installed dask under my home directory, so maybe it's something that works with a newer version. I have dask 2.3.0 and distributed 2.3.2.

My initial thinking is that dask dataframes aren't likely to be so useful - where our data fits into a columnar dataframe format, it's small enough that we don't need Dask. But maybe we'll find a use for it.

I'll have a read of Matt Rocklin's post.

tmichela commented 5 years ago

I use the default python from max-jhub:

dask==2.4.0
dask-jobqueue==0.6.3
dask-labextension==1.0.3
distributed==2.4.0
takluyver commented 5 years ago

Data transfers between machines will depend on what you're doing (of course). I've been experimenting with averaging data across trains, which should be relatively efficient if Dask works as I think it should.

It looks like the work that blog post discussed about HPC systems turned into dask_jobqueue, which we're using in the example.

tmichela commented 5 years ago

LGTM