pangeo-data / pangeo

Pangeo website + discussion of general issues related to the project.
http://pangeo.io
701 stars 189 forks source link

Transform Xarray data for deep learning frameworks #567

Closed mrocklin closed 4 years ago

mrocklin commented 5 years ago

People use Xarray with deep learning frameworks like TensorFlow and PyTorch. There is some frustration in moving data from the way it was stored for archive to how the scientist and deep learning framework want to consume it. Typically people transform their data based on both of the following needs:

  1. science needs, like what a sample should look like in their situation (a time series, a spatial patch, relevant variables, ...)
  2. algorithmic needs, like shuffling data around for the framework

Handing these needs with Xarray today seems to be a source of creativity, and of frustration. It seems like providing best practices here would be valuable. I'd like to start a conversation around it.

mrocklin commented 5 years ago

Disclaimer: I've never actually done this process, I've just listened to people. I'm probably about to say a few dumb things.

So it seems like in deep learning workflows people always do some pre-processing/shuffling of data and then dump it into a form that is easily consumable, possibly in a random-access way, by a deep learning framework. Presumably we should do the same.

This is nice because it provides a point of separation between what we'll need to accomplish in Xarray, and what we can hand off to the deep learning framework. My guess is that some previous frustration arises from trying to cross this boundary with the same process.

So we split our problem into two separated parts:

So, how about we split this process into

  1. Transform from the input Xarray data layout (presumably defined by externally-provided NetCDF files) into some easily-consumable-form by deep learning frameworks.
  2. Pull samples from that easily-consumable-form in a way that makes deep learning algorithms converge quickly (presumably this requires some sort of random access pattern)

Easily consumable layouts

Deep learning frameworks seem to want to consume a sequence of samples. Scientists tend to have a good understanding of what a sample looks like in their case. Here are some examples.

  1. A time series of temperature at a given point
  2. A 9x9 spatial patch at one time point and all altitudes across a set of five variables
  3. A pair of patches above, one after the other in time
  4. ...

Outside of the structure of these samples, no locality is expected. As a result, I can imagine wanting to reduce our data to an Xarray Dataset that looked something like the following ...

<xarray.Dataset>
Dimensions:  (sample: 100000, x: 9, y: 9, altitude: 27)
Coordinates:
  * sample ...
  * x ...
  * y ...
Data variables:
    foo      (x, y, altitude) float64 ...
    bar      (x, altitude) int64 ...
    baz      float64 ...

(I'm going with the 9x9 case)

So even though the original dataset had a much larger lat/lon extent, we're intentionally removing that and replacing with dummy coordinates x, y. The altitude coordinate was used across the entire sample, so we've kept it.

If this Dataset is backed by Dask then we enforce that no dimension other than sample can be chunked.

So once we're done then presumably we could do something like the following to pull out a batch

ds.isel(sample=slice(10000, 20000))

Shuffling

At this point if we want to shuffle the data prior to storage we've reduced the problem to a dask array problem. This problem isn't yet solved, but at least we've removed Xarray complexities from the equation.

Storage

So how do we store this in a form that is easy to hand off to a deep learning framework?

Well, we could dump this thing back down to NetCDF or Zarr or whatnot, it would effectively be a few arrays on disk, one array per variable. Pulling out a batch would be as many file accesses as there are variables in the form that we send to the deep learning framework. (is this common, are there typically many variables, or is there usually just one at this point?)

mrocklin commented 5 years ago

The comment above is just my own thinking, I encourage people to respond to the header comment independently with their own ideas. Other folks are likely more informed here than I am.

mrocklin commented 5 years ago

cc @nbren12 @RPrudden @jhamman @shoyer (please cc others that might be appropriate)

nbren12 commented 5 years ago

Adding @djgagne @raspstephan.

nbren12 commented 5 years ago

I think this is a great breakdown of the challenges, Matt.

Re, "Easily consumable layouts". It's usually easy to munge a small dataset into an xarray object with a sample dimension. It is harder for larger datasets where operations like reshape or sliding windows would tend to scramble different dask chunks. In the worst case scenario, the method of generating samples would require chunks with only 1 sample, which would not scale to millions of samples.

mrocklin commented 5 years ago

Re, "Easily consumable layouts". It's usually easy to munge a small dataset into an xarray object with a sample dimension. It is harder for larger datasets where operations like reshape or sliding windows would tend to scramble different dask chunks. In the worst case scenario, the method of generating samples would require chunks with only 1 sample, which would not scale to millions of samples.

I agree that this can be computationally challenging. Dask Array's algorithms for reshaping and rechunking are decent today. It remains a non-trivial problem, but we'll do about as well as can be done, which I think is likely to be good enough.

I haven't given much thought to sliding windows, thanks for mentioning it.

In the worst case scenario, the method of generating samples would require chunks with only 1 sample, which would not scale to millions of samples.

I'm not sure that I agree here. I think that Dask's current reshaping and rechunking algorithms avoid this situation. Or, if they don't then I'm not currently too concerned (though this might be naive).

mrocklin commented 5 years ago

Looks like the reshape algorithm today has some limitations, which we'd need to write code to overcome:

taken from the reshape docstring

  1. It assumes that the array is stored in C-order
  2. It only allows for reshapings that collapse or merge dimensions like (1, 2, 3, 4) -> (1, 6, 4) or (64,) -> (4, 4, 4)
nbren12 commented 5 years ago

Thanks @mrocklin. As you mention above, the sample dimension loses any locality character, so this reshaping need not comply with np.reshape or have an easy reverse operation. It could reshape each chunk and then re-order the chunks for example. Is there a function in dask that does this?

shoyer commented 5 years ago

This tutorial may be a good reference on the recommended way to save data for TensorFlow (tf.Example protos in TFRecords): https://www.tensorflow.org/tutorials/load_data/tf_records

My usual workflow is to process my data in parallel (in xarray data-structures using Apache Beam) and then save it into TFRecords files. Then I just use TensorFlow APIs for loading/minimal processing of data at run-time.

A simple variation would be to swap out Beam for Dask. For maximum usability, you could auto generate a "schema" for the dataset in some form, so users don't need to write their own boilerplate for building up dicts of tf.FixedLenFeature.

It's absolutely possible to efficiently load data stored in other formats (e.g., netCDF or Zarr stored with pre-shuffled examples), but you need to work a bit harder to ensure that your computation isn't limited by IO.

nbren12 commented 5 years ago

Thanks for that link @shoyer. It seems the main advantage with TFRecords is that an "example" can contain multiple variables, which are serialized together. This seems a little more natural than having each variable stored separately as would probably be done in xarray + zarr. OTOH, an xarray+zarr workflow might be more familiar to pangeo people, and potentially more easily extensible to PyTorch, etc.

To answer your question @mrocklin, for my use case there is more than one variable.

mrocklin commented 5 years ago

How prevalent is TFRecords among users of systems other than TensorFlow? Has it become a cross-framework standard?

Thanks @mrocklin. As you mention above, the sample dimension loses any locality character, so this reshaping need not comply with np.reshape or have an easy reverse operation. It could reshape each chunk and then re-order the chunks for example.

Yup, that's a good point and could work easily.

Is there a function in dask that does this?

Not really, no, but it wouldn't be hard to build. Neither would a proper reshape function (at least in the scope that we care about it).

To be clear, I'm not looking for "what can a user do with the implementation today" I'm looking for "what is the right way given a modest amount of core development effort". Getting reshape to work properly in this way, or building out a chunk reshape operration, or whatnot isn't hard. I'm inclined to build out tooling to make this a better-than-hacky experience.

My usual workflow is to process my data in parallel (in xarray data-structures using Apache Beam) and then save it into TFRecords files. Then I just use TensorFlow APIs for loading/minimal processing of data at run-time.

Is this the common case? I suspect that your data and training are both chunked by time, so there isn't much of a need to do anything that crosses the natural chunking of the data. How typical is this? My guess is "pretty typical, with some exceptions", but I'm curious to know more.

It's absolutely possible to efficiently load data stored in other formats (e.g., netCDF or Zarr stored with pre-shuffled examples), but you need to work a bit harder to ensure that your computation isn't limited by IO.

I'm curious, do you pre-shuffle your data before writing, or is there some tensorflow pipeline thing that is shuffling it while reading? I'm curious if we'll have to handle this or if we can leave it to the frameworks.

shoyer commented 5 years ago

How prevalent is TFRecords among users of systems other than TensorFlow? Has it become a cross-framework standard?

I don't know, you'd have to look into what pytorch recommends, for example.

Is this the common case? I suspect that your data and training are both chunked by time, so there isn't much of a need to do anything that crosses the natural chunking of the data. How typical is this? My guess is "pretty typical, with some exceptions", but I'm curious to know more.

The only "unnatural" chunking I need to do is randomly shuffling my input examples.

I'm curious, do you pre-shuffle your data before writing, or is there some tensorflow pipeline thing that is shuffling it while reading? I'm curious if we'll have to handle this or if we can leave it to the frameworks.

I actually do both:

I don't think you can avoid the shuffle before writing. In native form your data probably has some highly pathological ordering, e.g., your weather data is ordered by time.

Shuffling at read time may not be absolutely essential but it's quite easy/cheap to do. It's a pretty typical use case.

nbren12 commented 5 years ago

you'd have to look into what pytorch recommends

AFAIK, PyTorch has no recommended general storage format. I believe their built-in image datasets are stored in folders of .png files. I've seen some projects use LMDB in a similar way to TFRecords, though.

shoyer commented 5 years ago

It seems the main advantage with TFRecords is that an "example" can contain multiple variables, which are serialized together. This seems a little more natural than having each variable stored separately as would probably be done in xarray + zarr.

At the very least, is is important to be able to easily pull out an aligned batch of "examples" across multiple variables.

I don't think it's essential to serialize them together but certainly you need to be able to pull them together efficiently, e.g., with zarr you'd want to be able to quickly pull out arbitrary element i from all arrays along the "sample" dimension.

It probably does make the synchronization story a little less error prone, though, because all the data is in one place already. This could potentially matter for performance, e.g., to maximize disk throughput.

OTOH, an xarray+zarr workflow might be more familiar to pangeo people, and potentially more easily extensible to PyTorch, etc.

Keep in mind that for medium/big data (bigger than fits into memory), you are pretty much always going to want a second shuffled copy of your data for machine learning:

  1. First, import raw data into a columnar format like zarr for analytics/preprocessing.
  2. Export shuffled examples for your deep learning model.

The output of step 2 doesn't really need to be consumed by anything other than the deep learning framework, beyond easy access for human debugging purposes.

darothen commented 5 years ago

My usual workflow is to process my data in parallel (in xarray data-structures using Apache Beam) and then save it into TFRecords files. Then I just use TensorFlow APIs for loading/minimal processing of data at run-time.

@shoyer do you have any examples? I'm spending a lot of time at the moment working on ETL pipelines with Google Cloud Dataflow and would be curious to see your approach here and any "best practices".

rabernat commented 5 years ago

Pinging @anirban89 to get his input here. He has a workflow that I think is pretty similar to the "9x9 spatial patch" (except it's a 3x3 patch).

djgagne commented 5 years ago

While it would be nice to be able to train ml models quickly using data directly from model grids, there is a huge IO speedup from processing and saving data to an intermediate ML-friendly format. We should recommend that approach for most use cases.

For large sets of multivariate 2D and 3D data patches, the ordering of the array dimensions may also impact the training time. Tensorflow and Keras by default put the variable/channel dimension last, but concatenating different variable arrays this way is approximately twice as slow as putting the variable dimension second and concatenating them along that axis. PyTorch puts the variable dimension first by default. I have been using between 5 and 15 separate variables for a given problem, so getting the ordering right could be a big time savings up front.

I have been using csv and netCDF for my intermediate data file formats but am curious about TFRecord. Does it have any inherent advantages over other binary formats like netCDF or HDF5?

Regarding metadata for patches, I do try to store the time, Lon, lat, rows, and columns of each patch in coordinate arrays in the professed netCDF files. Being able to put the patches back onto a sparse grid is helpful for analysis and debugging purposes.

shoyer commented 5 years ago

@shoyer do you have any examples? I'm spending a lot of time at the moment working on ETL pipelines with Google Cloud Dataflow and would be curious to see your approach here and any "best practices".

Unfortunately this project isn't fully public yet, but I can share what this looks like at a high level. Here's the snippet where we make our Beam pipeline:

# Copyright 2019 Google LLC.
# SPDX-License-Identifier: Apache-2.0

import apache_beam as beam

def main(_):
  ...
  seeds = [i + initialization_seed_offset for i in range(num_seeds)]

  def build_pipeline(root):
    """Builds a pipeline that generates and saves tfrecords and metadata."""
    generate_pipeline = (
        root
        | beam.Create(seeds)
        | 'random_state' >> beam.Map(random_state)
        | 'integrate' >> beam.FlatMap(builder.integrate)
        | 'postprocess' >> beam.Map(builder.postprocess))

    save_pipeline = (  # pylint: disable=unused-variable
        generate_pipeline
        | beam.Reshuffle()
        | beam.Map(builder.convert_to_tf_example)
        | beam.io.tfrecordio.WriteToTFRecord(
            records_path, num_shards=num_shards))

    statistics_pipeline = (  # pylint: disable=unused-variable
        generate_pipeline
        | 'items' >> beam.FlatMap(lambda state: state.items())
        | 'calculate_statistics' >> beam.CombinePerKey(
            beamlib.MeanVarianceCombineFn())
        | 'combine_statistics' >> beam.combiners.ToDict()
        | 'save_metadata' >> beam.Map(
            builder.save_metadata,
            records_path,
            metadata_path,
            num_shards=num_shards,
            extra_fields=extra_metadata_fields))

  runner.run(build_pipeline)  # apache_beam.runners.PipelineRunner instance

You can find more complete examples in one of my open sourced projects but that dataset is actually small enough that we load it entirely into memory.

TensorFlow Transform is probably another good option for this sort of pattern, though I haven't used it personally.

One tip I do highly recommend if you're running actually TensorFlow functions inside a Beam pipeline (or Dask, for that matter) is to use TensorFlow's eager mode. Otherwise debugging gets very painful, and you end up writing your own systems for caching TensorFlow graphs.

shoyer commented 5 years ago

For large sets of multivariate 2D and 3D data patches, the ordering of the array dimensions may also impact the training time. Tensorflow and Keras by default put the variable/channel dimension last, but concatenating different variable arrays this way is approximately twice as slow as putting the variable dimension second and concatenating them along that axis. PyTorch puts the variable dimension first by default. I have been using between 5 and 15 separate variables for a given problem, so getting the ordering right could be a big time savings up front.

To be clear, this matters for maximizing matrix-multiplication throughput (depending on details of your hardware, such as whether you're running on GPU or CPU), but doesn't make much of different for preprocessing. The cost of one extra copy when loading data is not going to make a huge difference.

I have been using csv and netCDF for my intermediate data file formats but am curious about TFRecord. Does it have any inherent advantages over other binary formats like netCDF or HDF5?

Advantages:

Disadvantages:

shoyer commented 5 years ago

Thinking about this a little more, netCDF3 files with unlimited dimensions would actually be a pretty reasonable fit for our needs. The records ("examples" for our deep learning pipline) are all laid out at the end of the file, in the exact nested form that we want:

     recs         = [record ...]  // The data for all record variables are
                                  // stored interleaved at the end of the
                                  // file.
     record       = [varslab ...] // Each record consists of the n-th slab
                                  // from each record variable, for example
                                  // x[n,...], y[n,...], z[n,...] where the
                                  // first index is the record number, which
                                  // is the unlimited dimension index.
     varslab      = [values ...]  // One record of data for a variable, a
                                  // block of values all of the same type as
                                  // the variable in row-major order (last
                                  // index varying fastest).

Downsides:

You'd have to write the efficient code to read it, but at least in theory I bet you could do quite respectably by tossing a netCDF3 file(s) (the 64-bit offset version) with randomly shuffled records along the unlimited dimension up on object storage like AWS S3 or GCS.

jsignell commented 5 years ago

NetCDF3 files seems like a reasonable solution and have the benefit of already existing. Would the original coordinates be preserved?

mrocklin commented 5 years ago

I would like to ask some more questions about shuffling.

So once we get all of our samples in a nice linear Xarray, how would we like themd shuffled?

Ideally we would shuffle them entirely, so that any sample has an equal probability of being in any location. This is totally doable, though somewhat computationally intense. I'm curious if there are other lesser shufflings that would work as well, or that are common in practice. For example, it's quite easy to ...

  1. Shuffle data within a single chunk. Like if our data is chunked by day it's easy to shuffle things spatially and also by time within a day.
  2. Shuffle chunks. Like if our data is chunked by day then we can move all the days around

Both of those forms of "shuffling" are trivial, assuming that there isn't more complexity upstream from how we reshaped our data. My guess is that they aren't sufficient, and that people want full, entirely random, shuffling, but I thought I'd check first to see what was done in practice.

shoyer commented 5 years ago

Another downside of netCDF3 is that you can't really control the chunking. You either get size 1 chunks or no chunking at all.

So instead of @mrocklin's Dimensions: (sample: 100000, x: 9, y: 9, altitude: 27) you might need something more like Dimensions: (sample: 1000, batch: 100, x: 9, y: 9, altitude: 27) to ensure that overhead of parsing an example can be amortized.

You have to do the same sort of thing with TFRecords, but hypothetically there could be a more flexible "chunks" parameter in a system that looks more like zarr.

Would the original coordinates be preserved?

Yes

RPrudden commented 5 years ago

I'm currently working with a zarr dataset wrapped by a PyTorch dataloader class. Since each example is fairly large (100x100x12 up to about 1000x1000x12) shuffling hasn't really been an issue so far.

It seems the main advantage with TFRecords is that an "example" can contain multiple variables, which are serialized together. This seems a little more natural than having each variable stored separately as would probably be done in xarray + zarr.

It should be possible to interleave different variables into a single zarr, at least a dask array style zarr. What I don't know is whether there's any clean way to keep track of the different variables. We haven't run into this yet, since we're working with a single variable.

It's absolutely possible to efficiently load data stored in other formats (e.g., netCDF or Zarr stored with pre-shuffled examples), but you need to work a bit harder to ensure that your computation isn't limited by IO.

@shoyer I'd be interested to hear more of your thoughts on this. What is it about the design of TFRecords that makes IO simpler, and what tricks are needed to get similar performance from zarr and netCDF?

nbren12 commented 5 years ago

My guess is that they aren't sufficient, and that people want full, entirely random, shuffling

@mrocklin In practice, the shuffling could be restricted to the dimensions where the data is significantly non-stationary. For instance, some Earth science data varies a lot from north to south, but less in longitude. Unfortunately, shuffling within spatially contiguous chunks is probably not sufficient.

shoyer commented 5 years ago

@shoyer I'd be interested to hear more of your thoughts on this. What is it about the design of TFRecords that makes IO simpler, and what tricks are needed to get similar performance from zarr and netCDF?

The only real advantage is that TFRecord is row rather than column oriented:

shoyer commented 5 years ago

I think something close to a full shuffle is necessary. It isn't worth the risk of partially shuffling data to save a bit of compute when you're about to spend a huge amount of compute training a deep learning model.

nbren12 commented 5 years ago

The only real advantage is that TFRecord is row rather than column oriented:

I think row vs column orientation is a really helpful way to look at this. Thanks @shoyer.

So which should pangeo try to support? Column orientation fits in better with the current dask+zarr+xarray paradigm but is less flexible. For example, all samples would need to have the same dimensions. For row orientation to work, we would need a performant way to turn an xarray object into a list of serialized data.

shoyer commented 5 years ago

Column orientation fits in better with the current dask+zarr+xarray paradigm but is less flexible. For example, all samples would need to have the same dimensions.

I don't think is quite true. As long as all examples have a "sample" dimension with the same chunks (or can be broadcast to have it) you should be able to OK.

The record oriented aspect makes parallel IO a little simpler but may not be truly necessary. I would not be surprised if we can get excellent performance reading from a properly prepared zarr dataset.

It would be a good experiment to try hooking up xarray+zarr to input for tensorflow.data.Dataset (or the pytorch equivalent). Be sure to turn off dask, though -- tensorflow has its own parallelization for IO. This would start with a generator function that looks something like:

def generate_chunks(zarr_path, offset, chunk_size):
    # for max performance, "chunk_size" should  match zarr
    ds = xarray.open_zarr(zarr, path, auto_chunk=False)
    while True:
        # yield numpy arrays for one batch of data
        batch_ds = ds.isel(sample=slice(offset, offset+chunk_size)
        yield {k: v.data for k, v in batch_ds.items()}
        offset = (offset + chunk_size) % ds.dims['sample']

Then you would read data in parallel (starting at multiple offsets) using tf.data.Dataset.from_generator and tf.contrib.data.parallel_interleave, e.g., as described in the dataset performance guide. Something like:

offsets = tf.data.Dataset.from_tensors([0, 1000, 2000, 3000, ...])
dataset = offsets.apply(tf.contrib.data.parallel_interleave(
    lambda offset: tf.data.Dataset.from_generator(offset, ...),
    cycle_length=FLAGS.num_parallel_readers))

You'd probably want another utility functions for autogenerating the shapes and dtypes needed in from_generator, but hopefully you get the idea.

nbren12 commented 5 years ago

Okay. Thanks for that code @shoyer. I guess auto_chunk=False is the key, and then parallelizing across the offsets. For PyTorch, we could probably just rely upon dask for parallelism, and just rechunk to the appropriate batch size.

If this solves the storage issue, then the hard part remains the shuffling and the methods for defining the "samples".

nbren12 commented 5 years ago

It actually seems like the pytables API would work well for this. This API basically provides a high-level row-based Table abstraction on top of hdf5, and makes it easy to iterate over records. To me, this looks very similar to tfrecords. In both the user defines a schema, and then dumps/reads the data sequentially. I am surprised that there aren't many examples online using it for deep learning.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 5 years ago

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

mrocklin commented 5 years ago

This is still live, it's just likely to take a while. I can also move it elsewhere if people prefer.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

mrocklin commented 5 years ago

Keepalive

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 4 years ago

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

skeller88 commented 4 years ago

Column orientation fits in better with the current dask+zarr+xarray paradigm but is less flexible. For example, all samples would need to have the same dimensions.

I don't think is quite true. As long as all examples have a "sample" dimension with the same chunks (or can be broadcast to have it) you should be able to OK.

The record oriented aspect makes parallel IO a little simpler but may not be truly necessary. I would not be surprised if we can get excellent performance reading from a properly prepared zarr dataset.

It would be a good experiment to try hooking up xarray+zarr to input for tensorflow.data.Dataset (or the pytorch equivalent). Be sure to turn off dask, though -- tensorflow has its own parallelization for IO. This would start with a generator function that looks something like:

def generate_chunks(zarr_path, offset, chunk_size):
    # for max performance, "chunk_size" should  match zarr
    ds = xarray.open_zarr(zarr, path, auto_chunk=False)
    while True:
        # yield numpy arrays for one batch of data
        batch_ds = ds.isel(sample=slice(offset, offset+chunk_size)
        yield {k: v.data for k, v in batch_ds.items()}
        offset = (offset + chunk_size) % ds.dims['sample']

Then you would read data in parallel (starting at multiple offsets) using tf.data.Dataset.from_generator and tf.contrib.data.parallel_interleave, e.g., as described in the dataset performance guide. Something like:

offsets = tf.data.Dataset.from_tensors([0, 1000, 2000, 3000, ...])
dataset = offsets.apply(tf.contrib.data.parallel_interleave(
    lambda offset: tf.data.Dataset.from_generator(offset, ...),
    cycle_length=FLAGS.num_parallel_readers))

You'd probably want another utility functions for autogenerating the shapes and dtypes needed in from_generator, but hopefully you get the idea.

@shoyer how are you handling the shuffling of the data at the start of each epoch?

Column orientation fits in better with the current dask+zarr+xarray paradigm but is less flexible. For example, all samples would need to have the same dimensions.

I don't think is quite true. As long as all examples have a "sample" dimension with the same chunks (or can be broadcast to have it) you should be able to OK.

The record oriented aspect makes parallel IO a little simpler but may not be truly necessary. I would not be surprised if we can get excellent performance reading from a properly prepared zarr dataset.

It would be a good experiment to try hooking up xarray+zarr to input for tensorflow.data.Dataset (or the pytorch equivalent). Be sure to turn off dask, though -- tensorflow has its own parallelization for IO. This would start with a generator function that looks something like:

def generate_chunks(zarr_path, offset, chunk_size):
    # for max performance, "chunk_size" should  match zarr
    ds = xarray.open_zarr(zarr, path, auto_chunk=False)
    while True:
        # yield numpy arrays for one batch of data
        batch_ds = ds.isel(sample=slice(offset, offset+chunk_size)
        yield {k: v.data for k, v in batch_ds.items()}
        offset = (offset + chunk_size) % ds.dims['sample']

Then you would read data in parallel (starting at multiple offsets) using tf.data.Dataset.from_generator and tf.contrib.data.parallel_interleave, e.g., as described in the dataset performance guide. Something like:

offsets = tf.data.Dataset.from_tensors([0, 1000, 2000, 3000, ...])
dataset = offsets.apply(tf.contrib.data.parallel_interleave(
    lambda offset: tf.data.Dataset.from_generator(offset, ...),
    cycle_length=FLAGS.num_parallel_readers))

You'd probably want another utility functions for autogenerating the shapes and dtypes needed in from_generator, but hopefully you get the idea.

@shoyer thanks for sharing this example. Did you or @nbren12 figure out how to handle shuffling with a dataset that's too large to fit in memory?

skeller88 commented 4 years ago

This 2-pass shuffle algo could be a starting point for shuffling improvements: https://blog.janestreet.com/how-to-shuffle-a-big-dataset/?utm_source=share

nbren12 commented 4 years ago

That seems like an interesting approach ~, but still requires random access to the data for the first pass~. @raspstephan and I have both had good success on a limited form of shuffling, where you load a big chunk of data into RAM, and shuffle it, but don't shuffle between chunks. This is quite fast, and the hope is that there is a enough heterogeneity inside the RAM-sized chunks that a full shuffling of the dataset is not really necessary.

Edit: deleted incorrect statement.

raybellwaves commented 2 years ago

@nbren12's blog is of use here: https://www.noahbrenowitz.com/post/loading_netcdfs/