zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.47k stars 274 forks source link

Best practices for zarr and GCS streaming applications #595

Open skgbanga opened 4 years ago

skgbanga commented 4 years ago

Hello,

We are exploring zarr as a potential file format for our application. Our application is a streaming application which generates rows of data, which are continuously being appended to a 2D matrix.

I couldn't find 'best guidelines' when it comes to streaming and zarr and gcs. (for that matter, any cloud storage). Please point me in the right direction if there already exists something like this.

To evaluate zarr, I wrote a small script (Kudos on good docs! I was able to write this small app in very little time). Note that this is NOT optimized at all. The point of this issue/post is to figure out the best practices for such an application.

import os
import shutil
import time
import argparse
import zarr
import numpy as np
import gcsfs

TEST_PROJECT = "..."
TEST_BUCKET = "..."

TEST_GOOGLE_SERVICE_ACCOUNT_INFO = {}

n = 100
xs = 2
chunk_size = 10

def timer(fn):
    def wrapper(*args, **kwargs):
        start = time.time()
        fn(*args, **kwargs)
        dur = time.time() - start
        return dur

    return wrapper

@timer
def iterate(store):
    z = zarr.create(store=store, shape=(chunk_size, xs), chunks=(chunk_size, None), dtype="float")

    for i in range(n):
        row = np.arange(xs, dtype="float")
        z[i, :] = row

        if (i + 1) % chunk_size == 0:  # time to add a new chunk
            a, b = z.shape
            z.resize(a + chunk_size, b)

    z.resize(n, xs)

def in_memory():
    return iterate(None)

def disc():
    shutil.rmtree('data/example.zarr')
    store = zarr.DirectoryStore("data/example.zarr")
    return iterate(store)

def google_cloud():
    gcs = gcsfs.GCSFileSystem(TEST_PROJECT, token=TEST_GOOGLE_SERVICE_ACCOUNT_INFO)
    root = os.path.join(TEST_BUCKET, "sandeep/example.zarr")
    for f in gcs.find(root):
        gcs.rm(f)

    store = gcsfs.GCSMap(root, gcs=gcs, check=False)
    return iterate(store)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument("--memory", action="store_true")
    group.add_argument("--disc", action="store_true")
    group.add_argument("--gcs", action="store_true")
    args = parser.parse_args()

    if args.memory:
        dur = in_memory()
    elif args.disc:
        dur = disc()
    elif args.gcs:
        dur = google_cloud()
    else:
        raise ValueError("Please specify an option")

    print(f"Time taken {dur:.6f}")

Results:

$ ./foo.py --memory
Time taken 0.018762
$ ./foo.py --disc
Time taken 0.070137
$ ./foo.py --gcs
Time taken 54.315994

Above is 100 * 2 matrix, so 200 floats.

As you can see, this naive method of appending rows to zarr is clearly not the right way to do. (for reference, if I manually upload example.zarr to gcloud using gsutil it takes ~1.6secs). My guess is that everytime I do z[i, :] = row, it does a gcs write and that is destroying the performance.

So my major question is:

PS: A quickly look at strace ./foo.py --gcs showed a lot this:

futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 29, {1598977280, 926829000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a0, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 31, {1598977280, 926990000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a0, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 33, {1598977280, 927165000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 35, {1598977280, 927344000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 37, {1598977280, 927526000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x7f1edf5db4a0, {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0xf4c7a0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, {1598977290, 923842000}, ffffffff) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 41, {1598977281, 292908000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
sendto(5, "\0", 1, 0, NULL, 0)          = 1

I know zarr supports parallel writes to archive. Are these futex calls because of those?

rabernat commented 4 years ago

Thanks for the great question @skgbanga. I will try to provide some answers, but first I just wanted to tag this as closely related to https://github.com/pangeo-forge/pangeo-forge/issues/11.

rabernat commented 4 years ago

After reviewing your gcs code, I think I understand a the main issue which is making it slow.

z[i, :] = row

You are doing a write operation for each row. But zarr does not currently support partial chunk reads / writes (see #521 and #584; this is work in progress). So for each of these __setitem__ calls, Zarr must

Your data is tiny, so throughput is not an issue, but latency is. Each of these i/o ops will occur ~200 ms of latency, (+/- depending on how far you are from your GCS region). 0.2 s 2 100 = 40 s. So we are in the right ballpark.

Instead, I would recommend batching your writes together to cover a full chunk.

We are also working on adding some async features (e.g. #536), which should help with these latency issues. However, I don't think this would help you, since your writes need to be synchronous.

Edit: if you batch your writes together, you should be able to reduce the time by a factor of chunk_size, i.e. 5 s.

skgbanga commented 4 years ago

@rabernat Thanks for the quick reply!

Each of these i/o ops will occur ~200 ms of latency, (+/- depending on how far you are from your GCS region).

So my initial numbers were from a machine which accessed google infrastructure via internet. I have now moved to a cloud host which can access GCS via google's internal 10G(?) network. To show the difference:

internet> ping storage.googleapis.com
PING storage.googleapis.com (172.217.11.48) 56(84) bytes of data.
64 bytes from lga25s61-in-f16.1e100.net (172.217.11.48): icmp_seq=1 ttl=117 time=3.78 ms
64 bytes from lga25s61-in-f16.1e100.net (172.217.11.48): icmp_seq=2 ttl=117 time=3.04 ms
...
host> ping storage.googleapis.com
PING storage.googleapis.com (74.125.124.128) 56(84) bytes of data.
64 bytes from 74.125.124.128 (74.125.124.128): icmp_seq=1 ttl=115 time=0.976 ms
64 bytes from 74.125.124.128 (74.125.124.128): icmp_seq=2 ttl=115 time=0.996 ms
...

So a ~3x difference in ping times. (ping is not a great metric, but gives some idea).

On this host, my original numbers transform to:

$ ./foo.py --mem
Time taken 0.023775
$ ./foo.py --disc
Time taken 0.137749
$ ./foo.py --gcs
Time taken 21.994125

Much better than original 54 secs. Now on to your suggestion about this line:

z[i, :] = row

I tried doing what you suggested:

@timer
def iterate(store):
    z = zarr.create(store=store, shape=(chunk_size, xs), chunks=(chunk_size, None), dtype="float")

    rows = []
    num_chunks = 1 
    for i in range(n):
        row = np.arange(xs, dtype="float")
        rows.append(row)

        if (i + 1) % chunk_size == 0:
            start = (num_chunks - 1) * chunk_size
            end = num_chunks * chunk_size
            z[start:end, :] = np.array(rows)
            rows = []

            num_chunks += 1 
            z.resize(num_chunks * chunk_size, xs)

    assert rows is not None  # TODO handle this case
    z.resize(n, xs)

With the above code, the numbers are:

$ ./foo.py --mem
Time taken 0.008074
$ ./foo.py --disc
Time taken 0.016931
$ ./foo.py --gcs
Time taken 3.531046

So that's fantastic. Note that in the minuscule sample data above, we have 100 rows arranged in 10 chunks of 10 rows each. If I just have a single chunk of 100 rows (note that at this point, it is not really streaming), the time drops to 0.794 seconds.

So the biggest guideline when it comes to streaming/zarr/gcs seems to be to write data in chunks. I will say that this is slightly non intuitive since typically streaming backends also support some sort of caching before doing the 'flush', but it seems that zarr unconditionally forwards the call to setitem which calls gcs python api to do the actual write.

Thanks a lot for your help! I am going to test this with a real workload now, and reopen this issue if I hit more roadblocks. Also will be great if all the knowledge on this subject can be consolidated and put into a section in the documentation.

Cheers.

PS: I am also going to check the other issues you linked and see if I can leverage those.

rabernat commented 4 years ago

I'm curious if you can verify whether the data written to GCS matches your original source data exactly. pangeo-forge/pangeo-forge#11 describes a bug where a zarr appending workflow with GCS does not work as expected, but we haven't been able to track down the cause of that problem.

skgbanga commented 4 years ago

@rabernat I can verify that the data is exactly equal between the two arrays. But note that I am building a chunk locally, and then writing exactly that chunk to gcs. (https://github.com/pangeo-forge/pangeo-forge/issues/11's example is doing a general append so is probably exposing more edge cases?)

While we are at the subject of equality, I find the following behavior quite unintuitive:

>>> d1, d2
(<zarr.core.Array (100, 2) float64>, <zarr.core.Array (100, 2) float64>)

>>> d1 == d2
False

>>> d1.store, d2.store
(<zarr.storage.DirectoryStore object at 0x7fdd4363a828>, <fsspec.mapping.FSMap object at 0x7fdcffb04748>)                                                                    

>>> d1.hexdigest() == d2.hexdigest()                                                                                                                                         
True

Equality operator for Array is defined as:

      def __eq__(self, other):
          return (
              isinstance(other, Array) and
              self.store == other.store and
              self.read_only == other.read_only and
              self.path == other.path and
              not self._is_view
              # N.B., no need to compare other properties, should be covered by
              # store comparison
          )

I am coming from a C++ background, and this seems like comparing two vectors (https://en.cppreference.com/w/cpp/container/vector) based on allocators and not the actual data.

I think two array's should compare equal iff they have the same data irrespective of where that data is coming from.