google / tensorstore

Library for reading and writing large multi-dimensional arrays.
https://google.github.io/tensorstore/
Other
1.35k stars 120 forks source link

Copying from one TS to another (on GCS) painfully slow, uses a lot of host memory? (zarr3) #202

Open dlwh opened 1 week ago

dlwh commented 1 week ago

Using the zarr3 driver, writing 4GB data directly from numpy to a TS (and reading it back) take a few seconds, but copying from one TS to another is painfully slow (<100MB/minute). It also uses a lot of ram. This reproducer uses 8GB of ram (even if i do the creation of the source array in another process), but my non-minimized real code was using more like 30GB of ram.

Using Ray in this script.

import time

import numpy as np
import tensorstore as ts
import asyncio
import os

URI = "file:///tmp/issue_202/"

# Constants
DATA_SIZE = 1024**3
DEFAULT_CHUNK_SIZE = 256 * 1024  # 256 KB chunk size
DEFAULT_WRITE_CHUNK_SIZE = DEFAULT_CHUNK_SIZE * 512  # 128 MB write chunk size

# Define the spec for TensorStore using GCS dummy path
def _get_spec(uri, shape):
    return {
        "driver": "zarr3",
        "kvstore": uri,
        "metadata": {
            "shape": shape,
            "chunk_grid": {
                "name": "regular",
                "configuration": {"chunk_shape": [DEFAULT_WRITE_CHUNK_SIZE, *shape[1:]]},
            },
            "codecs": [
                {
                    "name": "sharding_indexed",
                    "configuration": {
                        "chunk_shape": [DEFAULT_CHUNK_SIZE, *shape[1:]],
                        "codecs": [{"name": "blosc", "configuration": {"clevel": 5}}],
                    },
                }
            ],
        },
    }

# Open a TensorStore with random data and set the mode for read or write
def _ts_open_sync(uri: str, dtype: ts.dtype, shape, *, mode):
    print("Opening TensorStore with uri:", uri, "and dtype:", dtype)
    spec = _get_spec(uri, shape)
    return ts.open(spec, dtype=dtype, shape=shape, **mode).result()

# Main function to set up the source and destination arrays and copy data
async def main():
    ts.experimental_update_verbose_logging('gcs_http=1,file=1')
    dtype = ts.int32
    true_shape = [DATA_SIZE]  # 1D array with 4GB of int32 data
    fake_shape = [DATA_SIZE * 64]  # 1D array with 256GB of int32 data. not really there

    # Create and fill the source TensorStore array with random data
    source_ts = _ts_open_sync(URI + "/source", dtype, fake_shape, mode={"create": True, "open": True})
    random_data = np.random.randint(0, 100, size=true_shape, dtype=np.int32)
    time_in = time.time()
    source_ts[0:DATA_SIZE].write(random_data).result()
    time_out = time.time()
    print(f"Random data written to source array: {time_out - time_in:.2f} seconds")
    del source_ts
    # verify the data is the same
    time_in = time.time()
    random_offset = 1376090583
    source_ts = _ts_open_sync(URI + "/source", dtype, fake_shape, mode={"create": False, "open": True})
    assert np.array_equal(source_ts[0:DATA_SIZE].read().result(), random_data)
    time_out = time.time()
    print(f"Data read from source array: {time_out - time_in:.2f} seconds")
    del random_data

    # Create the destination TensorStore array
    dest_ts = _ts_open_sync(URI + "/dest", dtype, fake_shape, mode={"create": True, "open": True})

    time_in = time.time()
    dest_ts[random_offset:random_offset + DATA_SIZE].write(source_ts[0:DATA_SIZE]).result()
    time_out = time.time()
    print(f"Data copied to destination array: {time_out - time_in:.2f} seconds")

    print("Data copy complete.")
    print(ts.experimental_collect_matching_metrics("/tensorstore"))
    print(dest_ts.kvstore.list().result())

# Run the asyncio loop
if __name__ == "__main__":
    asyncio.run(main())

"""
# for testing, we put this in a ray.remote task

import ray

@ray.remote(memory=40 * 1024**3)
def copy_data():
    asyncio.run(main())

if __name__ == "__main__":
    ray.get(copy_data.remote())
"""
dlwh commented 1 week ago

Forgot to include the timings:

(copy_data pid=18749, ip=10.130.1.204) Opening TensorStore with path: gs://marin-us-central2/scratch/dlwh/dummy/source and dtype: dtype("int32")
(copy_data pid=18749, ip=10.130.1.204) Random data written to source array: 4.21 seconds
(copy_data pid=18749, ip=10.130.1.204) Opening TensorStore with path: gs://marin-us-central2/scratch/dlwh/dummy/source and dtype: dtype("int32")
(copy_data pid=18749, ip=10.130.1.204) Data read from source array: 3.12 seconds
(copy_data pid=18749, ip=10.130.1.204) Opening TensorStore with path: gs://marin-us-central2/scratch/dlwh/dummy/dest and dtype: dtype("int32")
(copy_data pid=18749, ip=10.130.1.204) Data copied to destination array: 573.34 seconds
(copy_data pid=18749, ip=10.130.1.204) Data copy complete.
laramiel commented 1 week ago

At the end of your main you could print out the metrics to see better what's happening. In theory you are reading/writing only 32 chunks x 3; metrics can verify that:

    print(ts.experimental_collect_matching_metrics("/tensorstore"))
    print(dest_ts.kvstore.list().result())
dlwh commented 1 week ago

Having trouble making sense of this but here it is. It does seem like it's trying to write the entire large domain?

{'name': '/tensorstore/kvstore/gcs/bytes_written', 'values': [{'value': 269044916789}]},

Though the data actually in the storage seems right:

15:27 $ gsutil du -sh  gs://marin-us-central2/scratch/dlwh/dummy/source gs://marin-us-central2/scratch/dlwh/dummy/dest
1.01 GiB     gs://marin-us-central2/scratch/dlwh/dummy/source
1.01 GiB     gs://marin-us-central2/scratch/dlwh/dummy/dest
(copy_data pid=67652, ip=10.130.1.105) [{'name': '/tensorstore/cache/chunk_cache/reads', 'values': [{'value': 8192}]}, {'name': '/tensorstore/cache/chunk_cache/writes', 'values': [{'value': 12288}]}, {'name': '/tensorstore/cache/hit_count', 'values': [{'value': 20458}]}, {'name': '/tensorstore/cache/kvs_cache_read', 'values': [{'category': 'changed', 'value': 16377}, {'category': 'unchanged', 'value': 3458}]}, {'name': '/tensorstore/cache/miss_count', 'values': [{'value': 16441}]}, {'name': '/tensorstore/futures/force_callbacks', 'values': [{'value': 24627}]}, {'name': '/tensorstore/futures/live', 'values': [{'max_value': 8314, 'value': 3}]}, {'name': '/tensorstore/futures/not_needed_callbacks', 'values': [{'value': 8227}]}, {'name': '/tensorstore/futures/ready_callbacks', 'values': [{'value': 60654}]}, {'name': '/tensorstore/http/active', 'values': [{'max_value': 3, 'value': 0}]}, {'name': '/tensorstore/http/first_byte_latency_us', 'values': [{'0': 0, '1': 0, '10': 0, '11': 0, '12': 1, '13': 1, '14': 0, '15': 1789, '16': 721, '17': 59, '18': 155, '19': 559, '2': 0, '20': 989, '21': 1471, '22': 1274, '23': 35, '24': 3, '3': 0, '4': 0, '5': 0, '6': 0, '7': 0, '8': 0, '9': 0, 'count': 7057, 'mean': 499165.90803457727, 'sum_of_squared_deviation': 2049888924412408.8}]}, {'name': '/tensorstore/http/http_poll_time_ns', 'values': [{'0': 0, '1': 0, '10': 0, '11': 180731163, '12': 16238068, '13': 274627, '14': 188188, '15': 52087, '16': 83632, '17': 57480, '18': 54817, '19': 46867, '2': 0, '20': 51191, '21': 56045, '22': 35985, '23': 15845, '24': 5424, '25': 4116, '26': 2661, '27': 2001, '28': 865, '29': 1022, '3': 0, '30': 43, '4': 0, '5': 0, '6': 0, '7': 0, '8': 0, '9': 0, 'count': 197902127, 'mean': 4470.500754197975, 'sum_of_squared_deviation': 5.592358619223809e+19}]}, {'name': '/tensorstore/http/request_bytes', 'values': [{'0': 0, '1': 2542, '10': 0, '11': 2, '12': 0, '13': 0, '14': 0, '15': 0, '16': 0, '17': 0, '18': 0, '19': 0, '2': 0, '20': 9, '21': 18, '22': 36, '23': 108, '24': 172, '25': 348, '26': 678, '27': 1140, '28': 1947, '29': 57, '3': 0, '4': 0, '5': 0, '6': 0, '7': 0, '8': 0, '9': 0, 'count': 7057, 'mean': 39823988.613858506, 'sum_of_squared_deviation': 1.324975700204374e+19}]}, {'name': '/tensorstore/http/request_completed', 'values': [{'value': 7057}]}, {'name': '/tensorstore/http/request_header_bytes', 'values': [{'0': 0, '1': 0, '10': 0, '11': 0, '12': 7055, '2': 0, '3': 0, '4': 0, '5': 0, '6': 2, '7': 0, '8': 0, '9': 0, 'count': 7057, 'mean': 1085.4534504747032, 'sum_of_squared_deviation': 8532320.958480725}]}, {'name': '/tensorstore/http/request_started', 'values': [{'value': 7057}]}, {'name': '/tensorstore/http/response_bytes', 'values': [{'0': 0, '1': 2512, '10': 1, '11': 4516, '12': 1, '13': 0, '14': 0, '15': 0, '16': 0, '17': 0, '18': 0, '19': 0, '2': 0, '20': 0, '21': 0, '22': 0, '23': 0, '24': 0, '25': 0, '26': 0, '27': 0, '28': 0, '29': 16, '3': 0, '4': 0, '5': 0, '6': 0, '7': 9, '8': 2, '9': 0, 'count': 7057, 'mean': 308577.43743800704, 'sum_of_squared_deviation': 2.946738680057479e+17}]}, {'name': '/tensorstore/http/response_codes', 'values': [{'code': '304', 'value': 2512}, {'code': '404', 'value': 11}, {'code': '200', 'value': 4133}, {'code': '429', 'value': 401}]}, {'name': '/tensorstore/http/total_time_ms', 'values': [{'0': 0, '1': 0, '10': 979, '11': 1455, '12': 1334, '13': 44, '14': 3, '2': 1, '3': 1, '4': 0, '5': 1639, '6': 865, '7': 49, '8': 149, '9': 538, 'count': 7057, 'mean': 502.6154173161408, 'sum_of_squared_deviation': 2076383752.242594}]}, {'name': '/tensorstore/internal/riegeli/contiguous_bytes', 'values': [{'value': 12863930368}]}, {'name': '/tensorstore/internal/riegeli/noncontiguous_bytes', 'values': [{'value': 131072}]}, {'name': '/tensorstore/internal/thread/schedule_at/insert_histogram_ms', 'values': [{'0': 0, '1': 0, '10': 0, '11': 13, '12': 275, '13': 91, '14': 22, '2': 0, '3': 0, '4': 0, '5': 0, '6': 0, '7': 0, '8': 0, '9': 0, 'count': 401, 'mean': 1904.2793017456365, 'sum_of_squared_deviation': 279602660.7182045}]}, {'name': '/tensorstore/internal/thread/schedule_at/next_event', 'values': [{'value': 'infinite-future'}]}, {'name': '/tensorstore/internal/thread/schedule_at/queued_ops', 'values': [{'max_value': 9, 'value': 0}]}, {'name': '/tensorstore/kvstore/gcs/batch_read', 'values': [{'value': 2540}]}, {'name': '/tensorstore/kvstore/gcs/bytes_read', 'values': [{'value': 2173830427}]}, {'name': '/tensorstore/kvstore/gcs/bytes_written', 'values': [{'value': 269044916789}]}, {'name': '/tensorstore/kvstore/gcs/read', 'values': [{'value': 2540}]}, {'name': '/tensorstore/kvstore/gcs/read_latency_ms', 'values': [{'0': 0, '1': 0, '10': 0, '11': 4, '12': 7, '13': 5, '2': 0, '3': 0, '4': 0, '5': 1596, '6': 907, '7': 19, '8': 2, '9': 0, 'count': 2540, 'mean': 25.904724409448864, 'sum_of_squared_deviation': 50942226.94330701}]}, {'name': '/tensorstore/kvstore/gcs/retries', 'values': [{'value': 401}]}, {'name': '/tensorstore/kvstore/gcs/write', 'values': [{'value': 4114}]}, {'name': '/tensorstore/kvstore/gcs/write_latency_ms', 'values': [{'0': 0, '1': 0, '10': 834, '11': 1352, '12': 1322, '13': 39, '14': 3, '2': 0, '3': 0, '4': 0, '5': 0, '6': 0, '7': 30, '8': 110, '9': 424, 'count': 4114, 'mean': 809.4769081186172, 'sum_of_squared_deviation': 1034330296.3062742}]}, {'name': '/tensorstore/thread_pool/active', 'values': [{'max_value': 241, 'value': 47}]}, {'name': '/tensorstore/thread_pool/max_delay_ns', 'values': [{'max_value': 901113174}]}, {'name': '/tensorstore/thread_pool/started', 'values': [{'value': 241}]}, {'name': '/tensorstore/thread_pool/steal_count', 'values': [{'value': 8127.0}]}, {'name': '/tensorstore/thread_pool/task_providers', 'values': [{'max_value': 1, 'value': 0}]}, {'name': '/tensorstore/thread_pool/total_queue_time_ns', 'values': [{'value': 2618724761824.0}]}, {'name': '/tensorstore/thread_pool/work_time_ns', 'values': [{'value': 146707689823.0}]}]
(copy_data pid=67652, ip=10.130.1.105) [b'c/10', b'c/11', b'c/12', b'c/13', b'c/14', b'c/15', b'c/16', b'c/17', b'c/18', b'zarr.json']
(copy_data pid=67652, ip=10.130.1.105) Data copy complete.
laramiel commented 1 week ago

It does look like there are more reads and writes than I would expect from what you're trying to do.

    {'name': '/tensorstore/kvstore/gcs/read', 'values': [{'value': 2540}]}, 
    {'name': '/tensorstore/kvstore/gcs/read_latency_ms', 'values': [{'0': 0, '1': 0, '10': 0, '11': 4, '12': 7, '13': 5, '2': 0, '3': 0, '4': 0, '5': 1596, '6': 907, '7': 19, '8': 2, '9': 0, 'count': 2540, 'mean': 25.904724409448864, 'sum_of_squared_deviation': 50942226.94330701}]}, 

    {'name': '/tensorstore/kvstore/gcs/retries', 'values': [{'value': 401}]}, 

    {'name': '/tensorstore/kvstore/gcs/write', 'values': [{'value': 4114}]}, 
    {'name': '/tensorstore/kvstore/gcs/write_latency_ms', 'values': [{'0': 0, '1': 0, '10': 834, '11': 1352, '12': 1322, '13': 39, '14': 3, '2': 0, '3': 0, '4': 0, '5': 0, '6': 0, '7': 30, '8': 110, '9': 424, 'count': 4114, 'mean': 809.4769081186172, 'sum_of_squared_deviation': 1034330296.3062742}]}, 

That's 4000 writes with a mean latency of 800ms, and 2500 reads with a mean latency of 25ms, and ~400 retries indicates that we probably hit gcs throttling.

And there's only ~8 or so chunks with data, so that's a bit much.

You could try increasing the gcs logging and it would tell us more where that is happening. ts. experimental_update_verbose_logging('gcs=1')

dlwh commented 1 week ago

Hrm... that didn't change anything at all in terms of logging except I get the message that it's turned on.

(copy_data pid=17864, ip=10.130.0.97) I1017 09:09:34.012781   17864 verbose_flag.cc:135] --tensorstore_verbose_logging=gcs=1
laramiel commented 1 week ago

Sorry, the gcs logging is backend dependent. I should have asked you to use gcs_http=1

I can reproduce multi-write behavior even in the file driver. Will keep digging. Edited the script

laramiel commented 1 week ago

To clarify what is happening here:

This statement is copying the source tensorstore range to the destination tensorstore.

dest_ts[random_offset:random_offset + DATA_SIZE].write(source_ts[0:DATA_SIZE]).result()

The copy is driven by the source tensorstore, so for each chunk in the source it gets combined into the destination. This is a standard scatter/gather process, and there are two things to improve here:

The copy is happening at the read granularity, causing multiple writes. This is less than ideal and we should improve both of those behaviors. Arguably the write amplification is more costly than reads, so we should invert the process and then add additional scheduling improvements from there.

However there is a fairly simple change that will improve the performance in smaller copies: use transactions: When transactions are use the writes are deferred until the transaction commit happens. If you change the line to:

    with ts.Transaction() as txn:
        dest_ts.with_transaction(txn)[random_offset:random_offset + DATA_SIZE].write(source_ts[0:DATA_SIZE]).result()

Then in my testing the copy happens a lot faster.

dlwh commented 5 days ago

Thanks for the fast response, and apologies for the slow turnaround on my part! We got bit by an ongoing bug in the GCE metadata server that became a fire since it broke JAX initialization and TensorStore's logins, and I've been working to mitigate. I will try this today. Thank you!