HDFGroup / h5pyd

h5py distributed - Python client library for HDF Rest API
Other
109 stars 39 forks source link

Try dask on top of h5pyd #31

Open rsignell-usgs opened 6 years ago

rsignell-usgs commented 6 years ago

Try dask on h5pyd instead of h5py to see if there are issues.

rsignell-usgs commented 6 years ago

I tried a notebook example with dask on h5pyd here: https://github.com/rsignell-usgs/hsds_examples/blob/dask/nrel/notebooks/nrel_dask_example.ipynb and it mostly worked, but with some messages like:

WARNING:requests.packages.urllib3.connectionpool:Connection pool is full, discarding connection: 52.25.101.15
WARNING:requests.packages.urllib3.connectionpool:Connection pool is full, discarding connection: 52.25.101.15
WARNING:requests.packages.urllib3.connectionpool:Connection pool is full, discarding connection: 52.25.101.15

and also top of plot looks incorrect with repeated values in rows.

I don't really know what I'm doing here, so likely doing something bad with dask or h5pyd or both.

cc: @mrocklin

mrocklin commented 6 years ago

Very cool. Some things to try:

  1. Increase chunk size in dask, perhaps doubling in each dimension
  2. It looks like h5pyd might not like multiple concurrent connections, you might try the lock=True option to da.from_array
mrocklin commented 6 years ago

cc @martindurant @jjhelmus

jreadey commented 6 years ago

Increasing the chunk size fixed the connection pool problems. It looks like with the original chunk size, dask was sending 1000's of http request to the server, which overwhelmed the http connection pool.

Still not having the correct data display though. Tried the lock=True, it made the code run slower, but still had the messed up data.

I'll see if I can get a trace of the http requests.

jreadey commented 6 years ago

Sorry, still haven't had a chance to try out Dask yet. @mrocklin - there's a beta for HSDS that you can join if you would like to experiment with Dask & HSDS. See: https://www.hdfgroup.org/solutions/hdf-cloud.

mrocklin commented 6 years ago

I would be surprised to see Dask send 1000s of concurrent connections. By default we only run as many tasks as there are logical cores on a machine. I recommend trying your service with multiple threads, perhaps using some standard library like concurrent.futures or multiprocessing.pool.ThreadPool and seeing how it works. You might also try setting dask to run in single-threaded mode:

import dask
dask.set_options(get=dask.local.get_sync)

Just to set expectations, all Dask is doing here is running computations like x[:1000, :1000] and x[1000:2000, :1000] in multiple threads. We're pretty low-tech when it comes to data ingestion. I recommend stress testing concurrent access from a single process.

jreadey commented 6 years ago

Ok - I'll try out your suggestions. My plan is to devote some time in 2018Q1 to stress testing HSDS, so this course of action will fit in nicely with that.

mrocklin commented 6 years ago

As an FYI I'll be giving a talk about cloud-deployed Dask/XArray workloads at AMS on January 8th. If you make progress by then it would be interesting to discuss this as an option. https://ams.confex.com/ams/98Annual/webprogram/Paper337859.html

Although to be clear we're not just talking about a single machine reading in this case. We're talking about several machines on the cloud reading the same dataset simultaneously.

jreadey commented 6 years ago

I'll see if I can cook something up. Would it be possible for you to send me a draft of your presentation?

mrocklin commented 6 years ago

Once I have such a draft, sure. I'm unlikely to have anything solid before the actual presentation though. I'll be talking about Dask, XArray, and HPC/Cloud. Some topic of interest are in this github repository: https://github.com/pangeo-data/pangeo/issues