Closed rabernat closed 4 years ago
Have been wondering the same thing. Thanks for this nice write-up and performing this experiment. π
This is supported in zarr.js, for the same reasons as you wrote here. Here's the relevant PR and issue which also explains why it was a good idea (with some timings for less pathological cases where even there the speedup can be large).
Pinging @manzt who implemented above functionality, the JS implementation follows Python pretty closely here, so a similar implementation approach may be possible
Thanks for pinging me!
That is a crazy speedup, amazing.
I'd be very grateful if someone could give me an intuition for why it's so much faster. Is it that multiple http requests can basically be opened concurrently? If so, is there an upper bound on the number of requests that can be active concurrently, and would we need to manage that ourselves?
On Mon, 17 Feb 2020, 04:26 Trevor Manz, notifications@github.com wrote:
Thanks for pinging me!
β You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/536?email_source=notifications&email_token=AAFLYQTUJNQX5RGJT4LD6XTRDIGYHA5CNFSM4KVQPIV2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEL5AUNY#issuecomment-586811959, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFLYQTNPUFTZGMXVG23XD3RDIGYHANCNFSM4KVQPIVQ .
I'm not at familiar with Python's asyncio
but I'm assuming that there is overlap. Calling to WebAPIs (like fetch
) returns a Promise rather than a value so requests are non-blocking and can be made concurrently.
In the example above,futures
is a list of Python Futures (similar to promises in javascript). These are awaitable objects that represent an eventual result of an async operation. Using the session
, multiple requests are fired off and awaiting async.gather(*futures)
waits for the last future to be resolved.
async with aiohttp.ClientSession() as session:
tic = time.time()
futures = [get_chunk_http_async(n, session) for n in range(za.shape[0] // za.chunks[0])]
all_data = await asyncio.gather(*futures)
print(f"{time.time() - tic} seconds")
To handle concurrency, in zarr.js we use a dependency called p-queue
to limit the number of concurrent requests to 10 by default. This way we can dynamically add to the queue while iterating through the projections. Hopefully this looks somewhat similar to zarr.
const queue = new PQueue({ concurrency: concurrencyLimit });
for (const proj of indexer.iter()) {
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape); // returns promise
queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)); // returns promise that queue resolves
}
// guarantees that all work on queue has finished
await queue.onIdle();
I think asyncio.Queue
exists but that's about my limit of knowledge.
One concern I have here is that the API for accessing a slice in an array in zarr.js is naturally awaitable since everything in javascript is async. We need to wrap our requests in async functions and await the responses.
This in zarr-python,
arr = za[:]
is the same as this in zarr.js,
const arr = await za.get(null);
regardless of the store type. Ideally you'd hide all the async fetching from the end user and have the same API independent of whether the array is on disk or over HTTP. Someone smarter than me probably knows if this is possible.
To Alistair's point, if this is just about requests, I wonder if simply a Store
leveraging asyncio
internally would be sufficient to give you the same benefits. Not to say it wouldn't be interesting to add asyncio
to all of Zarr. Just curious what is needed to get the performance boost.
I wonder if simply a
Store
leveragingasyncio
internally would be sufficient to give you the same benefits
This is what I had in mind. I think the main performance benefit is when reading / writing many smallish chunks concurrently. We definitely don't want to force zarr users to use async.
Maybe a store could declare itself to use async for certain operations, and then zarr would know to use asyncio when performing loops with I/O operations. Otherwise it would default to the current, synchronous behavior.
I think there are cases where asyncio would hurt us. I did a few tests with files and found that, in some cases, asyncio was slower. I assume this is because issuing serial reads is more efficient than concurrent reads for some disks.
I wonder if this would be a good opportunity to add a batch retrieval method to the store API. E.g., something like store.multi_get(keys) -> iterable of values. This would be an optional method, which if not implemented would mean zarr falls back to current behaviour of calling getitem multiple times. If it was implemented, then the store internally could choose to implement via asyncio if appropriate, or not. I.e., the zarr core module would not need to be aware of async, it would be entirely internal to the store implementation. Also, stores could leverage this in other ways. E.g., if a store was wrapping a database, it could batch together all the key requests into a single transaction. Or, e.g., if the store was wrapping a cloud service that provided some API for batching multiple requests into a single HTTP call, it could use that. Just a thought.
On Tue, 18 Feb 2020 at 05:48, Ryan Abernathey notifications@github.com wrote:
I wonder if simply a Store leveraging asyncio internally would be sufficient to give you the same benefits
This is what I had in mind. I think the main performance benefit is when reading / writing many smallish chunks concurrently. We definitely don't want to force zarr users to use async.
Maybe a store could declare itself to use async for certain operations, and then zarr would know to use asyncio when performing loops with I/O operations. Otherwise it would default to the current, synchronous behavior.
I think there are cases where asyncio would hurt us. I did a few tests with files and found that, in some cases, asyncio was slower. I assume this is because issuing serial reads is more efficient than concurrent reads for some disks.
β You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/536?email_source=notifications&email_token=AAFLYQSSQWPHM5IIN3PBBWDRDNZBXA5CNFSM4KVQPIV2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEMAVJDY#issuecomment-587289743, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFLYQXW2ELB23IKXTEDGWLRDNZBXANCNFSM4KVQPIVQ .
--
Alistair Miles Head of Epidemiological Informatics Centre for Genomics and Global Health Big Data Institute Li Ka Shing Centre for Health Information and Discovery University of Oxford Old Road Campus Headington Oxford OX3 7LF United Kingdom Phone: +44 (0)1865 743596 or +44 (0)7866 541624 Email: alimanfoo@googlemail.com Web: http://a http://purl.org/net/alimanlimanfoo.github.io/ Twitter: @alimanfoo https://twitter.com/alimanfoo
Please feel free to resend your email and/or contact me by other means if you need an urgent reply.
Yeah this came up in issue ( https://github.com/zarr-developers/zarr-python/issues/384 ). Agree this would be a good solution to this problem (amongst others).
@alimanfoo would you envision the stores' API being synchronous, and just using asyncio under the hood for multi-gets? I've found asyncio quite difficult to work with because it "poisons" the entire stack above it, forcing the entire pipeline into peppering async/await annotations everywhere. You can get the event loop and resync inside a function call but it's not pretty.
@alimanfoo would you envision the stores' API being synchronous, and just using asyncio under the hood for multi-gets?
Yes that was my thought.
If we just want to parallelise network requests, we don't need to go full asyncio, right? We could do thread-based parallelism. Not fully convinced we need to buy into the whole asyncio package just to parallelise a few network requests.
That's a good point. We can probably live with something as simple as ThreadPoolExecutor
's map
.
TBH mapping with async, threads, or otherwise could just be specified as part of the API of this function (multigetitem
? or other appropriate name).
Async and parallel are not the same thing. From my point of view, I use dask to handle actual parallel I/O. The example I posted here is one where I don't actually want dask involved. It's a tiny amount of data. When using xarray with zarr, the dimension coordinates are always read eagerly and without dask. That's the use case that motivated this issue.
Furthermore, I would worry about any internal multithreading within zarr conflicting with dask's own parallelism. This would have to be handled carefully.
I'm very far from expert in async programming, but my sense is that Ryan makes an important point here. There may be an important difference between how asyncio behaves and how a simple multithreaded program behaves. Throw in the GIL and it gets more complicated. And being mindful of dask is important, when using dask we don't want a situation where the user runs a parallel program with 8 dask workers for the 8 cores on their machine, but each dask worker runs a zarr read which tries to use 8 threads, this leads to terrible thrashing of I/O and probably other resources. Although of course not everyone is using dask.
FWIW I would suggest these are all implementation choices that can be made by a store class. E.g., you could have a store that implements asyncio. Or you could have a store class which implements multithreaded I/O (although not recommended for use with dask). I would suggest we don't expose any such parameters in the multiget() method, rather it's a choice of which store class to use (or maybe parameters to the store class constructor).
On Thu, 5 Mar 2020, 04:42 Ryan Abernathey, notifications@github.com wrote:
Async and parallel are not the same thing. From my point of view, I use dask to handle actual parallel I/O. The example I posted here is one where I don't actually want dask involved. It's a tiny amount of data. When using xarray with zarr, the dimension coordinates are always read eagerly and without dask. That's the use case that motivated this issue.
Furthermore, I would worry about any internal multithreading within zarr conflicting with dask's own parallelism. This would have to be handled carefully.
β You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/536?email_source=notifications&email_token=AAFLYQUNY52X74TVJY7OJULRF4UVHA5CNFSM4KVQPIV2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEN3V4SA#issuecomment-595025480, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFLYQXW32LDCO4K45T7SG3RF4UVHANCNFSM4KVQPIVQ .
I ran across this thread because I was having some very slow write performance for small data using gcsfs
's mapper object. I cobbled together this mutable mapping class in a few hours today that seems to get 4x improvements compared to gcsfs
. Right now, it only supports asynchronous writes by flushing a cache whenever it exceeds a specified size. It would be pretty straightforward to write a multiget
method. Is multiget
supported by zarr?
@nbren12 - thanks for sharing! I would personally encourage us to be trying to make upstream performance improvements to gcsfs. My limited experience with these cloud filesystem-spec implementations suggests that they do a lot of unnecessary checking / caching (e.g. https://github.com/dask/s3fs/issues/285). Are your 4x performance enhancements due to async, or something else?
It would be pretty straightforward to write a
multiget
method. Ismultiget
supported by zarr?
I believe this is what is being worked on in https://github.com/zarr-developers/zarr-python/pull/534.
I'm definitely open to contributing this to gcsfs, but just wanted to put it out in the wild, since I personally just need the mapping capability, not the "fs". I believe the performance enhancement is due to async from examining the debugging level output for the example code in the README. This output shows the HTTP requests made for each call. fsspec does about 10 HTTP operations each taking about .5s to complete in sequence. OTOH, the async code I wrote seems to complete these same requests...well...asynchronously. I didn't notice any gratuitous list operations like #534, and the directory only had about 2 items in it to begin with.
Here's an example of this output: https://gist.github.com/nbren12/10bed8494d067c3ff6c880c368561602. Part of the problem is that gcsfs seems to be using resumable uploads (requiring 3 HTTP requests for each chunk), but I still suspect async is speeding things up dramatically. The chunk-uploading only takes 0.3-1 seconds with async, but 13 seconds w/o.
Very interesting, thanks for sharing. Pining @martindurant for the gcsfs perspective.
Thanks @nbren12 for sharing. There isn't support for multiget
internally within zarr yet, it's just an idea at the moment, but FWIW I think it's worth pursuing. It will require some re-engineering of zarr internals though, likely a substantial piece of work.
Note that fsspec
would, in general, love async support for all backends where this is feasible (perhaps not for a few like FTP). However, it is also true that zarr/xarray is used a lot with Dask, so there is some form of parallelism in operation already - for those cases, async may not be helpful.
Quick comment for the resumable upload in gcsfs: s3fs has implemented a shortcut to defer creation of the resumable upload and use a one-call version when the total file size is small enough (<5MB). The method for the single-call version exists in GCSFile, but the shortcut to execute is is not in place. It could be copied from s3fs.
Yah it would be great to have generic async support in fsspec. I expect async will have a lot less overhead than a dask worker for small data, but don't have the benchmarking to back this up.
Async and dask are also not incompatible. We can imagine a use case where we have many zarr chunks for each dask chunk. Using async in zarr would speed up the load time for each dask chunk.
Sent from my iPhone
On Mar 30, 2020, at 7:28 PM, Noah D Brenowitz notifications@github.com wrote:
ο»Ώ Yah it would be great to have generic async support in fsspec. I expect async will have a lot less overhead than a dask worker for small data, but don't have the benchmarking to back this up.
β You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.
Yah. I will just echo Ryan. Basically this is similar to my workflow, dask distributed for coarse-grained parallelism, and then async for faster I/O.
Dask just indexes into the array with normal __getitem__
, right? I struggle a little with the concepts of asyncio, but my understanding is that within __getitem__
, you can just run a bunch of async chunk-fetch calls, blocking until you have all of them, and then assemble them synchronously, so that the async is entirely under the hood: dask doesn't need to know. We can have lots more async tasks than CPUs because we're just waiting for IO.
Alternatively, it looks like dask can work with asyncio, so you could feasibly use the same event loop to prevent just forking and forking and forking. Then it's up the executor to decide how many jobs to run concurrently; zarr using asyncio just defines the dependency graph of the operation, not how it's executed.
If possible, exposing an async Array API would be great, and seems to be doable with asyncio.Future
s. This async array stuff (which does all the nuts and bolts of converting indices into chunk indexes, and the returned chunks into a single array) would be exposed to users, and user under the hood, but the current Array would stay synchronous. e.g.
# has a member called `aio` with async API
array = Array(...)
# - get chunk idxs
# - do `await sync_to_async(store.__getitem__)(chunk_key)` for each
# - assemble chunks into output array and return
out = await array.aio[:]
# - basically just calls async_to_sync(self.aio.__getitem__)(key)
out2 = array[:]
There may be some overhead in doing this, of course.
Dask just indexes into the array with normal
__getitem__
, right? I struggle a little with the concepts of asyncio, but my understanding is that within__getitem__
, you can just run a bunch of async block-fetch calls, blocking until you have all of them, and then assemble them synchronously, so that the async is entirely under the hood: dask doesn't need to know. We can have lots more async tasks than CPUs because we're just waiting for IO.
This captures my understanding very clearly.
If possible, exposing an async Array API would be great, and seems to be doable with
asyncio.Future
s.
FWIW I'd suggest leaving that to a second stage of work.
For the moment, I'd suggest focusing on a stage one which discusses how to modify the store API to expose a multiget
method, which store classes can decide to implement using async under the hood to accelerate retrieval of multiple chunks.
Far from expert here though, happy to hear other views.
FWIW I'd suggest leaving that to a second stage of work.
Agreed, it's not part of an MVP, but it's worth keeping in mind as it may inform how the rest is implemented.
Obviously, multiget
requires the backend to be async too, which may be easy to do for the http case(s), but I definitely would like some help applying to fsspec
backends, many of which are also http-based.
For context and clarity, does multiget
mean a method (presumably on the backend store) that would receive a collection / iterable of keys to fetch and it would decide how best to fetch them?
For context and clarity, does
multiget
mean a method (presumably on the backend store) that would receive a collection / iterable of keys to fetch and it would decide how best to fetch them?
That's how I'm interpreting it.
Right now, fetching multiple keys always happens inside a loop: https://github.com/zarr-developers/zarr-python/blob/994f2449b84be544c9dfac3e23a15be3f5478b71/zarr/core.py#L1023-L1028
Obviously,
multiget
requires the backend to be async too
Does it? Again, not entirely firm on the concepts of asyncio but e.g. asgiref
has a sync_to_async
decorator which converts synchronous calls into awaitable coroutines. Whether or not you get any benefit out of that in this context is another question. The backend can be synchronous, and should be usable like this if so long as it's thread-safe (just like the current concurrency model using a threadpoolexecutor).
I meant "for it to be useful" :) Running a loop of blocking function calls as if they were async means that it will take just as long (or maybe slightly longer) and not necessarily in order.
Running a loop of blocking function calls as if they were async means that it will take just as long
That seems to be true for all asyncio
, though. As far as I can tell, nothing is ever done concurrently unless you manually schedule it with asyncio.create_task
: coroutines hand off control before the awaitable is scheduled, and then block for just as long as they were going to anyway, as soon as you get back to it. Only one thing is executing at a time, it just looks around before doing anything which might block for a while to make sure nothing else needs doing first.
Scheduling sync_to_async
functions immediately with asyncio.create_task
seems to work for a very simple time.sleep
-based example, at least.
Yes it's always true, but the point is to propagate the sync/await mechanism down the stack to the actual IO (file, http or ssh, etc., access in this case)
Sorry, I'm clearly misunderstanding. Somewhere, some synchronous code is being run. Whether that's in zarr, or in the python implementation of the store backend, or closer to the metal (file system, ssh etc.). That is synchronous, and everything above it is synchronous until something decides to do some parallelisation. Then everything above that is asynchronous but serial until tasks are manually scheduled, at which point they become asynchronous but concurrent, until something above it decides to do asyncio.run()
, which brings it back into sync.
Asking every possible zarr backend to use asyncio
seems out of scope. What's the difference, then, between an asyncio-native backend, and a synchronous backend with an asyncio wrapper which creates tasks to allow concurrency? In both cases, there's sync code happening somewhere below us in the stack. The important thing for us is that chunks are fetched concurrently, which means we have to be the ones creating tasks anyway - I'm not sure what we lose by being the ones to wrap the sync code in coroutines as well.
Somewhere, some synchronous code is being run.
This is not true: if the s3 implementation uses aiobotocore or the HTTP implementation aiohttp, then coroutines do not run and do not take any time until there is data for them to process. That is how you get concurrency without parallelism (i.e., cooperative multitasking). So it's not useful to wrap sync code just for the purpose of running it on an executor/event loop, unless you also want to run in threads - which is a whole other conversation.
I think it's totally acceptable for zarr to make use of multiget of a backend, which will be internally async if the backend happens to support this. That's why I keep coming back to making fsspec async where possible, since I see that as the route to implementing many zarr stores with the minimum code.
The important thing for us is that chunks are fetched concurrently, which means we have to be the ones creating tasks anyway
This isn't necessarily what's important. In my case (described in #547) the store gets significant benefit without necessarily being async. What I'd like to do is have a multiget
method that lets me recognize that I'm about to receive 1000 requests for contiguous byte ranges and fetch them all in a single HTTP/S3 request rather than individual requests. No async is required for the simplest cases, and spinning up 1000 threads would be faster than what currently happens but potentially problematic for whomever is hosting the data.
How do you put 1000 S3 requests into a single request?
sorry, just read your issue; I am not sure how typical the situation is, when the more usual scenario is one zarr key per one remote file - but I am not against allowing arbitrary logic to happen within multiget, and no need to implement async - but still a nice to have!
Yes just to be clear, I was thinking of adding a store.multiget
method to the store interface, which accepts an iterable of keys and returns an iterable of (key, bytes) pairs. This method would be called from normal (synchronous) Python code within the zarr core module, and would replace any code that currently loops through a set of keys performing one store.__getitem__
per key.
The potential benefit is that a store implementation could choose to use this information that multiple retrieval requests are being made to provide various types of optimisation. E.g., a store implementation could use asyncio. Or, e.g., a store implementation could use multiple threads. Or, e.g., a store implementation backed by a database could execute all requests in a single transaction. Or, e.g., a cloud store implementation could use a more complex mapping of chunks to underlying cloud objects, with multiple chunks stored inside a single object, and issue byte range requests where possible (see also #384). I.e., lots of possibilities open up.
(Or maybe store.getitems
would be a better name, closer to store.__getitem__
for single key/value retrieval?)
I personally prefer getitems
over multiget
Or maybe just __getitem__
with a collection of keys? After all keys are just strings in our case π
FWIW I'd rather not overload getitem. Easier and clearer to have two separate methods IMHO.
On Wed, 1 Apr 2020, 10:18 jakirkham, notifications@github.com wrote:
Or maybe just getitem with a collection of keys? After all keys are just strings in our case π
β You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/536#issuecomment-607135969, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFLYQS7GPCCM6QCEURXESTRKMBG5ANCNFSM4KVQPIVQ .
Somewhere, some synchronous code is being run. Whether that's in zarr, or in the python implementation of the store backend, or closer to the metal (file system, ssh etc.
@clbarnes I was caught up on this too. It feels like the buck has to stop somewhere...and it does. Libraries like aoihttp/asyncio ultimately make calls to asynchronous OS calls. The OS implements the asynchronous references to TCP/UDP sockets for network traffic. See this stackoverflow answer. OTOH, most OSs donβt expose asynchronous access to files. I think async will only help HTTP or other network based storage backends.
Is there a need for a multiset
in addition to multiget
method? It is possible to handle that case using caching built into the storage class, but the explicit control could be useful.
Yes. It would be good to support both. This can come up when setting data that overlaps multiple chunks.
I think there are some places where zarr would benefit immensely from some async capabilities when reading and writing data. I will try to illustrate this with the simplest example I can.
Let's consider a zarr array stored in a public S3 bucket, which we can read with fsspec's
HTTPFileSystem
interface (no special S3 API needed, just regular http calls).Note that this is a highly sub-optimal choice of chunks. The 1D array of shape (6443,) is stored in chunks of only (5,) items, resulting in over 1000 tiny chunks. Reading this data takes forever, over 5 minutes
I believe fsspec is introducing some major overhead by not reusing a connectionpool. But regardless, zarr is iterating synchronously over each chunk to load the data:
https://github.com/zarr-developers/zarr-python/blob/994f2449b84be544c9dfac3e23a15be3f5478b71/zarr/core.py#L1023-L1028
As a lower bound on how fast this approach could be, we bypass zarr and fsspec and just fetch all the chunks with requests:
As expected, reusing a connection pool sped things up, but it still takes 100 s to read the array.
Finally, we can try the same thing with asyncio
This is a MAJOR speedup!
I am aware that using dask could possibly help me here. But I don't have big data here, and I don't want to use dask. I want zarr to support asyncio natively.
I am quite new to async programming and have no idea how hard / complicated it would be to do this. But based on this experiment, I am quite sure there are major performance benefits to be had, particularly when using zarr with remote storage protocols.
Thoughts?
cc @cgentemann