pangeo-data / pangeo

Pangeo website + discussion of general issues related to the project.
http://pangeo.io
701 stars 189 forks source link

workflow for moving data to cloud #48

Closed rabernat closed 6 years ago

rabernat commented 6 years ago

I am currently transferring a pretty large dataset (~11 TB) from a local server to gcs. Here is an abridged version basic workflow:

# open dataset (about 80 x 133 GB netCDF files)
ds = xr.open_mfsdataset('*.nc', chunks={'time': 1, 'depth':1}) 

# configure gfs
import gcsfs
config_json = '~/.config/gcloud/legacy_credentials/ryan.abernathey@gmail.com/adc.json'
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token=config_json)
bucket = 'pangeo-data-private/path/to/data'
gcsmap = gcsfs.mapping.GCSMap(bucket, gcs=fs, check=True, create=True)

# set recommended compression
import zarr
compressor = zarr.Blosc(cname='zstd', clevel=5, shuffle=zarr.Blosc.AUTOSHUFFLE)
encoding = {v: {'compressor': compressor} for v in ds.data_vars}

# store
ds.to_zarr(store=gcsmap, mode='w', encoding=encoding)

Each chunk in the dataset has 2700 x 3600 elements (about 75 MB), and there are 292000 total chunks in the dataset.

I am doing this through dask.distributed using a single, multi-threaded worker (24 threads). I am watching the progress through the dashboard.

Once I call to_zarr, it takes a long time before anything happens (about 1 hour). I can't figure out what dask is doing during this time. At some point the client errors with the following exception: tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7fe371f58a58> exception was never retrieved. Nevertheless, the computation eventually hits the scheduler, and I can watch its progress.

image

I can see that there are over 1 million tasks. Most of the time is being spent in tasks called open_dataset-concatenate and store-concatenate. There are 315360 of each task, and each takes about ~20s. Doing the math, at this rate it will take a couple of days to upload the data, this is slower than scp by a factor of 2-5.

I'm not sure if it's possible to do better. Just raising this issue to start a discussion.

A command line utility to import netcdf directly to gcs/zarr would be a very useful tool to have.

rabernat commented 6 years ago

Here is an alternative, more "cloudy" way this might work:

In this case, we would end up with many zarr stores, just like we have many netcdf files. We would need an open_multizarr function in xarray to simplify automatic concatenation of such stores.

mrocklin commented 6 years ago

Can you get a sense for what the bottleneck is? I/O? Compression?

rabernat commented 6 years ago

My first response would be "neither," since neither the system CPU (5%) nor outbound IP traffic (~20,000 kilobit/s) is anywhere close to saturated.

Reading the data from disk could also be a bottleneck, especially if each of these 24 threads is accessing a different, random chunk of the data. From the timing of the tasks above, reading and writing seem to be similar. But maybe I am not measuring correctly.

mrocklin commented 6 years ago

If you're using the dask.distributed scheduler (which given the images above, you probably are) I recommend looking at the "Profile" tab.

mrocklin commented 6 years ago

You could also consider changing the threads/processes mixture using the n_workers= and threads_per_worker= keywords to Client or LocalCluster

rabernat commented 6 years ago

I don't have a "Profile" tab. I guess my distributed version is out of date. I do have a "System" tab.

Thanks for the suggestions about profiling. I will try to do some more systematic profiling. For now I just wanted to get the transfer started, and I am reluctant to interrupt it.

mrocklin commented 6 years ago

Yeah, you might consider upgrading at some point. Doc page on the profiler: http://distributed.readthedocs.io/en/latest/diagnosing-performance.html#statistical-profiling

rabernat commented 6 years ago

Yes, that definitely would have been good to have!

rabernat commented 6 years ago

Looks like I am on track to do about 100 GB in one day. At this rate, it will take 100 days to upload the dataset.

@jhamman - did you have similar performance with the Newmann Met ensemble?

mrocklin commented 6 years ago

Heh, I recommend profiling and looking at using more processes.

rabernat commented 6 years ago

This server does not have more processes available. (I am not on a cluster here, just our own data server.) I will interrupt the transfer and set up more careful profiling.

-Ryan

On Tue, Dec 12, 2017 at 9:35 AM, Matthew Rocklin notifications@github.com wrote:

Heh, I recommend profiling and looking at using more processes.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/48#issuecomment-351069042, or mute the thread https://github.com/notifications/unsubscribe-auth/ABJFJtl4Fz7ULLq2hfnZGodmlZx7KAtjks5s_o9HgaJpZM4Q98RA .

mrocklin commented 6 years ago

I meant that you might consider using more processes and fewer threads per process

client = Client(n_workers=4, threads_per_worker=4)
jhamman commented 6 years ago

@jhamman - did you have similar performance with the Newmann Met ensemble?

Yes. I'm not sure I ever got past the setup / serialization step.

jhamman commented 6 years ago

@rabernat and @mrocklin - I'm using https://github.com/dask/gcsfs/pull/49 and the xarray/master branch to move data to GCP now. Initialization was much faster. I'll report back if/when this completes.

rabernat commented 6 years ago

I'm glad that the initialization bottleneck seems to be solved!

I still think we have a lot of work ahead figuring out how to tune chunks / compression / n_procs / n_threads to efficiently move data into the cloud using this method.

rabernat commented 6 years ago

FYI, I have been playing around with to_zarr using the default DirectoryStore. I am still getting long initialization wait times. So that suggests gcsfs is not necessarily the culprit.

mrocklin commented 6 years ago

@rabernat in your situation I might try dumping a tiny xarray dataset to zarr and profile the operation, seeing which parts of the process take up the most time. I generally use the %snakeviz magic for this

pip install snakeviz
%load_ext snakeviz
%snakeviz ds.to_zarr(...)
mrocklin commented 6 years ago

@jhamman checking in, were you able to upload anything to GCS or is Geyser still down?

jhamman commented 6 years ago

Reporting back after doing a bit of profiling on a ~21Mb dataset. For the tests I'm reporting now, I persisted the dataset into memory prior to writing to the zarr store. I also compared writing to a local store on a SSD. I have attached the results from running the to_zarr method with %prun. These files can be read using snakeviz via the following syntax:

snakeviz filename.prof

Profiles.zip

As a teaser, here is a snapshot from snakeviz.

profile

The outermost gray ring is method 'acquire' of '_thread.lock' objects.

Finally, here is the notebook I used to generate these tests.

mrocklin commented 6 years ago

_thread.lock.acquire is a sign that this is using the multithreaded scheduler, which is difficult to profile. Can you try a second time with dask.set_options(get=dask.get)?

mrocklin commented 6 years ago

Ah, I see that you're using a client. We might want to avoid this when profiling. Although the dask.set_options call should override that regardless

jhamman commented 6 years ago

@mrocklin - See attached profiles using dask.set_options(get=dask.get) instead of client.

Profiles.zip

mrocklin commented 6 years ago

@martindurant may want to see this. There is a lot of time spent in operations like _list_bucket.

Also interesting is that most of the time is spent in SSL_read rather than writing.

@jhamman I might suggest rechunking your data differently to have fewer larger chunks, and then see how that affects bandwidth. I suspect that we are mostly bound here by administrative checkins with GCS and not by compressing or sending bytes.

jhamman commented 6 years ago

I tried another chunk configuration (5 chunks per dataarray) and the throughput went from 380s to 405s (slower). I'm also curious about the SSL_read calls. Is it possible we're doing something silly where we initialize the arrays on GCS then read them back, before writing to them again?

mrocklin commented 6 years ago

Yeah you're right, it looks like we load in all of the data to the local process.

AbstractWritableDataStore.set_variables calls

         if vn not in self.variables:

Where self.variables is a property that calls load

@property
def variables(self):
    # Because encoding/decoding might happen which may require both the
    # attributes and the variables, and because a store may be updated
    # we need to load both the attributes and variables
    # anytime either one is requested.
    variables, _ = self.load()
    return variables

This appears to be about 120s of your 400s

mrocklin commented 6 years ago

Another issue seems to be excessive metadata/attrs collection. For example we seem to be creating around 100 Zarr arrays. In each case we seem to spend around 60ms getting metadata, resulting in around 60s of lost time.

Two questions:

  1. @jhamman should we be making 100 zarr arrays here? Does that sound right given the process you're doing?
  2. @alimanfoo given that we know the metadata locally perhaps we can hand this to the Array constructor to avoid touching the store mapping unnecessarily?
mrocklin commented 6 years ago

We seem to spend a long time dealing with attrs. Each of these can take some time.

More broadly, every time we touch the GCS mapping from far away there is a non-trivial cost. It appears that XArray-on-Zarr touches the mapping frequently. I wonder if there is some way to fill out all of our metadata locally and then push that up to GCS in a single go.

mrocklin commented 6 years ago

@martindurant do you have any thoughts on long-running connections and gcsfs? Is this feasible to avoid the thousand small SSL handshakes we're doing here?

jhamman commented 6 years ago

Ouch. The variables property issue seems like a bug and one that I'm pretty sure I introduced. I'll look into it more tomorrow and see if we can come up with a fix on the xarray side for that point.

The change on the xarray side was here: https://github.com/pydata/xarray/pull/1609/files#diff-e7faa2e88465688f603e8f1f6d4db821R226

rabernat commented 6 years ago

This is pretty fascinating. Some of the backend optimizations required to improve this could potentially be combined with pydata/xarray#1087. Fetching attributes lazily sounds like a low hanging fruit.

jhamman commented 6 years ago

I have implemented a somewhat unsatisfactory fix for the xarray issue. I'm testing it now and will try to get it completed today.

jhamman commented 6 years ago

@jhamman should we be making 100 zarr arrays here? Does that sound right given the process you're doing?

I would expect the number to be closer to 38 arrays (7variables x 5 chunks + 3 coordinates).

Below are some updated profiles using the changes in https://github.com/pydata/xarray/pull/1799.

Profiles_after_xr_fix.zip

mrocklin commented 6 years ago

Now it looks like the biggest issue is in attribute handling and preparing the variables prior to sending data?

jhamman commented 6 years ago

I'm still confused/concerned about the amount of time we spend with the SSL handshakes.

mrocklin commented 6 years ago

SSL handshakes are expensive, especially if you are far away from the other destination. There are several network roundtrips to do the full handshake. Currently gcsfs does this handshake every time we touch any piece of data. I hope that we can reduce this with long-running connections or Sessions. @martindurant would know more though, I suspect that he has thought about this before.

mrocklin commented 6 years ago

@jhamman you should also try merging in https://github.com/dask/gcsfs/pull/22 . I suspect that your times will go down significantly

alimanfoo commented 6 years ago

Just looking at the code in Zarr for reading attributes, currently every time an attribute is accessed the '.zattrs' key is retrieved from the store. I think it would be sensible to add an option to Zarr to cache the attributes, turned on by default.

On Fri, Dec 22, 2017 at 1:16 AM, Matthew Rocklin notifications@github.com wrote:

@jhamman https://github.com/jhamman you should also try merging in dask/gcsfs#22 https://github.com/dask/gcsfs/pull/22 . I suspect that your times will go down significantly

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/48#issuecomment-353497997, or mute the thread https://github.com/notifications/unsubscribe-auth/AAq8QpnD_PCPAC1s73DbUOq1MnfT9yUhks5tCwLogaJpZM4Q98RA .

-- Alistair Miles Head of Epidemiological Informatics Centre for Genomics and Global Health http://cggh.org Big Data Institute Building Old Road Campus Roosevelt Drive Oxford OX3 7LF United Kingdom Phone: +44 (0)1865 743596 Email: alimanfoo@googlemail.com Web: http://a http://purl.org/net/alimanlimanfoo.github.io/ Twitter: https://twitter.com/alimanfoo

jhamman commented 6 years ago

@mrocklin and @alimanfoo - I just tried my little test case again with dask/gcsfs#22, dask/gcsfs#49, and https://github.com/alimanfoo/zarr/pull/220. I've had to move to a new machine so the tests are not going to be a perfect match but it seems like we've cut down the number of SSL_read calls from 345228 to 6910. This seems to be yielding about a 5x speedup. That said, it still takes about 60 seconds to push 20 Mb so there is probably still room for improvement.

Snakeviz ready profile is attached.

program_to_zarr_gcsfs.prof.zip

alimanfoo commented 6 years ago

Glad things are going in the right direction. I'll take a look at the profile data in the new year, see if anything else can be done on the zarr side.

alimanfoo commented 6 years ago

Btw don't know if this is relevant, but if you're setting multiple attributes on a single array of group, you can save some communication by calling o.attrs.update(...) rather than making multiple calls to o.attrs['foo'] = 'bar' etc. Probably some ways to shave off even more communication beyond this too.

jhamman commented 6 years ago

Thanks @alimanfoo. I just opened a xarray PR (https://github.com/pydata/xarray/pull/1800) that uses update rather than set_item. This has reduced the write time from 60 seconds to ~35 seconds. An updated profile is attached for those who are interested.

program_to_zarr_gcsfs.prof 2.zip

alimanfoo commented 6 years ago

Great, glad that was useful.

martindurant commented 6 years ago

Does https://github.com/dask/gcsfs/pull/55 impact the performance? Previously, there was no Session invoked, I think now some connections should be reusable.

jhamman commented 6 years ago

@martindurant - yes, that moved the mark from ~35 seconds to ~25 seconds thanks to a sharp decrease in the number of ssl read calls (1885 vs 6910):

         640185 function calls (639148 primitive calls) in 24.996 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1885   22.844    0.012   22.844    0.012 {built-in method _openssl.SSL_read}
      770    0.349    0.000    0.349    0.000 {built-in method _openssl.SSL_write}

This is using zarr-master, https://github.com/pydata/xarray/pull/1800, and the gcsfs combination of https://github.com/dask/gcsfs/pull/22, https://github.com/dask/gcsfs/pull/49, and https://github.com/dask/gcsfs/pull/55.

An updated profile is attached: program_to_zarr_gcsfs.prof.zip

jhamman commented 6 years ago

Closed by a combination of pydata/xarray#1800, dask/gcsfs#22, dask/gcsfs#49, dask/gcsfs#55, and zarr-developers/zarr#220.

mrocklin commented 6 years ago

This could probably still use some documentation to help other groups (like @rabernat 's) push relevant data to the cloud

rabernat commented 6 years ago

I am eager to try this out. I guess I just have to update my xarray, gcsfs, and zarr to latest master? Or are there other steps that need to be documented?

martindurant commented 6 years ago

gcsfs was just released, so a normal conda update should do for that one.

mrocklin commented 6 years ago

Thank you for keeping up with work on gcsfs @martindurant . It was very useful to have your time on this.

alimanfoo commented 6 years ago

Zarr 2.2 not released yet (hoping to get to that soon) so latest master.

On Wed, Jan 24, 2018 at 3:49 PM, Matthew Rocklin notifications@github.com wrote:

Thank you for keeping up with work on gcsfs @martindurant https://github.com/martindurant . It was very useful to have your time on this.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/48#issuecomment-360178023, or mute the thread https://github.com/notifications/unsubscribe-auth/AAq8Qibc5pGEsTh0NFmiE4gysjPCYB9gks5tN1EUgaJpZM4Q98RA .

-- Alistair Miles Head of Epidemiological Informatics Centre for Genomics and Global Health http://cggh.org Big Data Institute Building Old Road Campus Roosevelt Drive Oxford OX3 7LF United Kingdom Phone: +44 (0)1865 743596 Email: alimanfoo@googlemail.com Web: http://a http://purl.org/net/alimanlimanfoo.github.io/ Twitter: https://twitter.com/alimanfoo