zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.45k stars 273 forks source link

[V3] Support for batched Store API in v3 #1806

Open akshaysubr opened 4 months ago

akshaysubr commented 4 months ago

The Store API in v3 supports fetching a single key at a time or partial values of multiple keys.

class Store(ABC):
    @abstractmethod
    async def get(
        self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
    ) -> Optional[bytes]:
        ...

    @abstractmethod
    async def get_partial_values(
        self, key_ranges: List[Tuple[str, Tuple[int, int]]]
    ) -> List[bytes]:
        ...

The new BatchedCodecPipeline, for instance, currently fetches data for all requested chunks concurrently by using an asyncio thread pool with each task calling the get method of the Store:

            chunk_bytes_batch = await concurrent_map(
                [(byte_getter,) for byte_getter, _, _, _ in batch_info],
                lambda byte_getter: byte_getter.get(),
                runtime_configuration.concurrency,
            )

Since there have been some concerns about scalability of asyncio for a large number of tasks, would it make sense to move this batched fetch into the Store itself? This would allow another Store implementation to potentially use a more performant asynchronous framework for the batched fetch, say in C++ or Rust, and can look like a single asyncio task to zarr-python.

This is a feature that currently exists in v2 through the getitems Store API which is used to enable GPU Direct Storage in Zarr through kvikIO.

A similar feature is also now added (provisionally) to v3 codecs that support an encode_batch and a decode_batch with a default implementation that ships tasks off to an asyncio thread pool but allows a codec to override that in favor or another approach if needed.

rabernat commented 4 months ago

Do queues have a role to play here?

To me it makes sense that the store and the codec pipeline would communicate via a queue. The store puts compressed chunks into a queue. The codec pipeline pulls items out and processes them. Either one could do this either via single iterms or in batches. If the queue is full, the store could stop fetching chunks until there is room (backpressure).

JackKelly commented 4 months ago

I'd love to see a batched Store API in Zarr-Python!

(Just to connect the dots: Here's an informal proposal I wrote for a Store.get_items method for Zarr-Python... which includes links to @akshaysubr's previous discussions about this topic!)

the store and the codec pipeline would communicate via a queue

Yes! I agree! (Ideally a multi-producer multi-consumer queue.) For reference, here's my current planned design for the light-speed-io Rust crate that I'm currently chipping away at. The relevant part for this discussion is the crossbeam::channel which sends chunks from "Layer 1 (the IO layer)" to "Layer 2: Parallel computation on chunks" :slightly_smiling_face:

My ultimate aim is to allow Zarr-Python (and others) to offload IO (and decompression?) to Rust, if users want that.

jhamman commented 4 months ago

I've also been thinking that a producer/consumer queue is the way forward here but I'm not clear at all where in the stack these queues would go. IMO, this is the part of the equation that is missing for us to take action here.

rabernat commented 4 months ago

If this queue was an object which could be addressed by non-python code, then that would really open the door to the sort of interoperability we are looking for.

Python std library has two Queue objects:

Both of these can be limited on the number of items. But I think we want to limit on the total number of bytes stored instead.

We could also implement our own purpose-built Queue in a low-level language and talk to it from Python.

rabernat commented 4 months ago

Here's an example of sending items to a python Queue from an async task and then processing those items from a thread. This is basically what we want to do between store and codec pipeline.

import threading
import queue

import asyncio
import time

SLEEP = 0.1

async def generate_data(item, q):
    # non-blocking sleep--represents time spent on I/O
    await asyncio.sleep(SLEEP)  
    print(f"putting item {item}")
    q.put(item)

def process_item(q):
    item = q.get()
    # blocking sleep--represents CPU-bound task
    time.sleep(SLEEP)
    print(f"processed {item}")
    q.task_done()

def worker():
    while True:
        process_item(q)

async def generate_items():
    for n in range(10):
        await generate_data(n, q)

q = queue.Queue()
threading.Thread(target=worker, daemon=True).start()

await generate_items()
JackKelly commented 4 months ago

On the question of sharing a queue between Python and Rust: I like the idea. But I've searched the PyO3 GitHub organisation for "queue" and haven't found anything relevant :disappointed:

For my main use-case, where I'd like to load and decompress on the order of 1 million chunks per second (on a single machine), I'd probably want to keep the chunk-by-chunk loading & processing in Rust-land. I'd guess that a Python queue would struggle to handle 1 million items per second, whilst still leaving enough CPU cycles for decompression and other bits and bobs (but I haven't tried and I'd love to be proved wrong!).

What I'm imagining - whilst being very ignorant of the current architecture of Zarr-Python v3 - is something like this:

The user would, in a single function call, ask Zarr-Python to load some subset of a huge array. Zarr-Python would calculate the set of chunks that need to be loaded[^1]. Then Zarr-Python would, in a single async function call, ask the Store to "get and decompress these million chunks, and let me know when you're done". It'd be entirely up to the Store to figure out how best to parallelise the work, and how to pass data from IO to decompression.

Could that work? (I'm sure there's some glaring problem with that overly simple sketch!) I'm guessing that doesn't fit at all with the way that Zarr v3 splits up IO and codecs?

[^1]: Although it's possible that Python would struggle to compute which million chunks we want, in the time available. So we may also want to compute the set of chunks in Rust-land. But then we've got a full Zarr implementation in Rust!

rabernat commented 4 months ago

In the current architecture, the Store doesn't handle decompression. That's a separate thing (the Codec Pipeline).

JackKelly commented 4 months ago

ah, yes, of course... sorry.... :flushed:... I need to re-read the Zarr v3 spec soon :slightly_smiling_face: !

On the topic of sharing a queue between Rust and Python... In a recent comment, a core maintainer of the PyO3 library says:

You could wrap the receiver end of the [Rust std::sync::mpsc::]channel in a #[pyclass] to expose the receiving end to Python. You have the choice of using blocking or async to wait for messages.

rabernat commented 4 months ago

I need to re-read the Zarr v3 spec soon 🙂 !

This is not really covered by the spec. It's about the software architecture of Zarr Python (which is exactly what this issue is for discussing).

rabernat commented 4 months ago

I'd guess that a Python queue would struggle to handle 1 million items per second

Here's a quick experiment

```python import threading import queue import asyncio import time class Timer: def __init__(self): self._log = [("start", time.perf_counter())] def log(self, message): self._log.append((message, time.perf_counter())) def __repr__(self): out = f"\n" for n in range(1, len(self._log)): message = self._log[n][0] diff = self._log[n][1] - self._log[n-1][1] out += f" - {message}, {diff}\n" return out # don't add any overhead SLEEP = 0 async def enqueue_data(batch, q): # non-blocking sleep--represents time spent on I/O await asyncio.sleep(SLEEP) for item in batch: q.put(item) def process_item(worker_id, q, batch_size=1): # not sure if batching really optimizes anything here since for _ in range(batch_size): item = q.get() # blocking sleep--represents CPU-bound task time.sleep(5*SLEEP) q.task_done() def worker(worker_id, q, batch_size=1): while True: process_item(worker_id, q, batch_size=batch_size) async def generate_items(NITEMS, q, batch_size=1): for batch in batched(range(NITEMS), batch_size): await enqueue_data(batch, q) NTHREADS = 32 NITEMS = 1_000_000 timer = Timer() q = queue.Queue() timer.log("create queue") threads = [ threading.Thread( target=worker, args=(worker_id, q,), kwargs={"batch_size": 100}, daemon=True ).start() for worker_id in range(NTHREADS) ] timer.log("create threads") await generate_items(NITEMS, q, batch_size=100) timer.log(f"generate {NITEMS} items") q.join() timer.log("finish processing") pritn(timer) ```
<Timer started at 108872.175694578>
 - create queue, 4.448898835107684e-05
 - create threads, 0.003170869007590227
 - generate 1000000 items, 0.6771369689959101
 - finish processing, 10.761253934004344

So I was able to enqueue 1_000_000 items in less than a second (working in batches of 100), but it took me 10 seconds to process them (32 core machine). This example took 10 minutes to cook up, so probably lots of room for improvement.

akshaysubr commented 4 months ago

This discussion about how to orchestrate a producer-consumer workflow between chunk readers (Store) and decompressors (Codecs) is really interesting. But I worry that it is taking away from the more fundamental question of the required Store API. Even if we use a queue to orchestrate these tasks, the current Store API only allows the store to put one item at a time into the queue rather than a batch of items at a time into the queue.

I would propose that we think about implementation of points raised in this issue in two steps:

  1. Add a batched API to the Store base class that defaults to just doing concurrent loads from multiple threads (similar to the default decode_batch in the Codec API). This way, a Store implementation can decide if it wants single chunk fetching being the natural way to do things or if it makes more sense for the implementation to think of batched case being the default with single chunk fetches being a specialization of that.
  2. The concurrency policy is essentially encoded into the CodecPipeline. This is where I think the mapping of these tasks to a specific system architecture should be encoded. We would want to have different approaches reading from a local drive or an object store and similarly for computing on the CPU vs GPU. There are two codec pipelines currently in #1670: A BatchedCodecPipeline and an InterleavedCodecPipeline depending on what is better for a specific hardware architecture. We can certainly add another DynamicCodecPipeline that uses queues.

"get and decompress these million chunks, and let me know when you're done"

@JackKelly This should be possible by implementing a custom CodecPipeline, potentially in Rust, but is broader scope than the Store. The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

JackKelly commented 4 months ago

The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

Before I can answer properly, I need to get myself up-to-speed on the design of the CodecPipeline :slightly_smiling_face:! Is #1670 the best place to read the latest info about the CodecPipeline? Is there an architecture diagram showing how CodecPipeline fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)

JackKelly commented 4 months ago

I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

My plan is for light-speed-io to call codec libraries written in C/C++ or Rust. For example, there's a Rust wrapper for c-blosc2. Although I'll probably start with more "mainstream" codecs like lz4.

Whilst it's technically possible for Rust to call Python code (via PyO3), I'd worry that would kill any performance benefits! (Although I haven't benchmarked it!)

normanrz commented 4 months ago

The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

Before I can answer properly, I need to get myself up-to-speed on the design of the CodecPipeline 🙂! Is #1670 the best place to read the latest info about the CodecPipeline? Is there an architecture diagram showing how CodecPipeline fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)

The PR is the best place for the moment. Basically, the CodecPipelines get a list of chunks to fetch with the according slices, fetch the data from the store, decode the chunks and assemble the chunk arrays into 1 output array.

Currently, there are 2 implementations: batched and interleaved. I want to combine both into one.

It would not be a big change for the codec pipeline to accept a queue. Probably changes the Store API considerably, though.

JackKelly commented 4 months ago

Sounds great! Thank you for the explanation.

I'll try to enable my little light-speed-io Rust crate to provide CodecPipeline-like functionality (fetching data from IO, decoding, assembling chunks into 1 output array) and a Python API. The ultimate aim could be that light-speed-io could provide a drop-in CodecPipeline for zarr-python. (Although light-speed-io will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?

Although... I should stop mumbling about my pet project and instead help to pull the focus of this thread back to @akshaysubr's original questions about a batched Store API!....

normanrz commented 4 months ago

The ultimate aim could be that light-speed-io could provide a drop-in CodecPipeline for zarr-python. (Although light-speed-io will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?

Yes, that should be possible. At this point, I wouldn't consider the CodecPipeline a stable API, though. It will probably change quite a bit until the 3.0 release and maybe even afterwards.

JackKelly commented 4 months ago

No worries, sounds good.

When you do start the process of stabilising the CodecPipeline API, please could you tag me in the discussion? I'll probably just passively listen to the discussion, so I can have the best chance of moulding light-speed-io to play nicely with zarr-python v3 :slightly_smiling_face:

jhamman commented 3 weeks ago

This thread got off topic. @akshaysubr - can you help us get back on track? What I don't understand about your request is why get_partial_values isn't a sufficient drop in for getitems? Despite its name, get_partial_values works just fine when requesting full chunks.

akshaysubr commented 2 weeks ago

@jhamman You're right, get_partial_values should work just fine in theory, but there are a couple of issues:

  1. The current batched codec pipeline doesn't use get_partial_values and dispatches the get method to a bunch of threads.
  2. Calling the method get_partial_values is somewhat unintuitive from a naming perspective. Also might make the code a bit harder to understand if get_partial_values is called on full chunks across the codebase.

There's also the question of whether get should be the base implementation that getitems/get_partial_values builds on top off or the other way around. It would seem that the other way around might be better for performance?