kylebarron / parquet-wasm

Rust-based WebAssembly bindings to read and write Apache Parquet data
https://kylebarron.dev/parquet-wasm/
Apache License 2.0
483 stars 19 forks source link

Request batching #392

Closed kylebarron closed 2 months ago

kylebarron commented 7 months ago

Currently the async requests are really slow because IIUC they do all fetches sequentially. See here for a painful example. If the data isn't cached, it'll slowly fetch in:

https://github.com/kylebarron/parquet-wasm/assets/15164633/6b7eaa1b-8c02-4baf-8731-65be7863de09

You should take a closer look at the individual request ranges, but I think it's fetching every column independently. There are 7 record batches and 7 columns. Each field other than geometry is pretty small.

image

And indeed most of the fetches are in the low kb size, other than some 1.5-2MB fetches, which are probably for the geometry column.

It looks like the way to fix this is to implement a custom AsyncFileReader which will implement get_byte_ranges to combine range requests.

https://github.com/pka/http-range-client may also be helpful. cc @H-Plus-Time in case you have interest/insight

kylebarron commented 7 months ago

Ok yeah so the default implementation just calls self.get_bytes(range).await?; sequentially.

From georust discord:

There's two different kinds of merging - one for index traversal and one for feature traversal and they have different strategies reflecting their different layouts.

Here's the index request merging: https://github.com/flatgeobuf/flatgeobuf/pull/93 And here's the feature request merging: https://github.com/flatgeobuf/flatgeobuf/pull/319

Though you should probably refer to the latest version of that same code since some bugs have been fixed.

It also looks like maybe it would be good to try and use the ObjectStore API directly,

Also object store exposes this publicly: https://docs.rs/object_store/latest/object_store/fn.coalesce_ranges.html

H-Plus-Time commented 7 months ago

Yeah, I think if it were possible to amp up the coalescing to multiple row_groups (I'd say on an average connection, the UScounties.parquet file would be great to serve with 1 meta/head + 2x 3-4 row group requests), the streaming workflow would make total sense. I've played around with one builder per row group + StreamExt::Buffered to reasonable effect (~30% reduction vs sequential request / row group) - with just that and a sufficiently large file (12MB is small enough that cloudflare's ~300ms/request lag is noticeable), you could probably saturate your network link (especially if the row group sizes are bigger - swapping to 2 row groups for the same UScounties.parquet file dropped the execution time penalty from 40% to 14%).

H-Plus-Time commented 7 months ago

Ah :|, it really is a caching problem - with caching configured properly (I just used an 'all parquet files are eligible for caching' rule in cloudflare and left it at that), the per row group version (the content of #393) is IO bound (saturates my network link), and the per column, per row group is moderately slower (~20%).

kylebarron commented 7 months ago

possible to amp up the coalescing to multiple row_groups

That should be possible too. Under the hood we can pass a vector of row group indexes.

I'm not sure I want to set the default to fetching multiple row groups per request (though you could implement some heuristics based on the row group byte size, which you learn from the file metadata).

kylebarron commented 7 months ago

In the existing code, we also create a new client per request, IIUC, whereas the new content in #393 reuses the same client, and maybe that helps with connection keep-alive?

H-Plus-Time commented 7 months ago

possible to amp up the coalescing to multiple row_groups

That should be possible too. Under the hood we can pass a vector of row group indexes.

I'm not sure I want to set the default to fetching multiple row groups per request (though you could implement some heuristics based on the row group byte size, which you learn from the file metadata).

Yeah, it looks like it's particularly difficult to do via the parquet streaming interface (passing a vector of row groups, I still end up with serialized row group requests), tinkering with the AsyncFileReader seems to be the way the ParquetObjectStoreReader and datafusion's physical plan manage to do it.

Yeah, the moment we expose things like with_row_filter, with_projection, meddling with multi-row group requests would need to be ignored (the intra-row group coalescing still makes sense).

I think I'd be comfortable with a desired_request_size parameter in the stream interface, and bulk coalescing of contiguous row groups (minimum of 1 row group per request, maximum of 2x desired_request_size per request). That would open the door for an easy usability win in the form of row group filtering, and a very clearly marked "skip messing with the requests if any of these conditions are met" point.

kylebarron commented 7 months ago

Unless/until we can send concurrent requests, it seems like merging requests is always going to be a better option. At the moment I took away something like desired_request_size in favor of coalesce_size: the allowed number of bytes between two request ranges to merge and send as one request.

H-Plus-Time commented 7 months ago

Yes, merged intra-row-group, concurrent requests appears to be optimal.

Alright, results from the week of experiments:

The sweet spot for concurrency on h1.1 endpoints is ~4 (accounting for the two briefly used up by the HEAD and GET), but I'd stress that the concurrent partial read story for http/1.1 is never going to be a pleasant one in browser environments (because of the connection cap and lack of multiplexing, mixed with the tendency for object stores to have large fixed latencies).

H-Plus-Time commented 7 months ago

It also looks like Cloudflare freezes the protocol of R2 custom domains at creation - anything before ~April this year is stuck on 1.1 (a lot more recent than I'd assumed), and up to somewhere around October they're all http2 (apparently R2 is under extremely active development, a custom domain created today is set to http3). Only way to change it is to remove the domain in R2 settings and re-add it (https://community.cloudflare.com/t/how-can-i-enable-http-2-on-my-r2-bucket/501520).

kylebarron commented 7 months ago

Thanks!! This is really awesome stuff!

  • It is in fact possible to use ParquetObjectStore if you're willing to implement your own (almost entirely stubbed) impl ObjectStore for FoobarObjectStore

do you have a preference for using a stubbed ObjectStore vs a manual implementation? I'm not sure which would be more maintainable.

It seems like a lot of work to upstream full wasm support into object-store, which is why I was learning towards a fully custom implementation.

  • Concurrent requests ended up being straightforward once I admitted defeat trying to get ParquetRecordBatchStream to behave concurrently, and opted for one per row group:

That looks amazing!

Oh I see, you create a different reader for each record batch index. That seems fine to me.

  • the above kicks off $n$ requests immediately, and likely boosts the peak memory usage by a bit (decompressed row groups, mind, so concurrency above 8 with beefy row groups would probably be... a bad idea)).

I don't fully understand some of how stream processing works in JS, but if you have a stream reader on the JS side, as soon as the batch is returned, the memory should clear in wasm?

Does this maintain ordering? If you want record batches [5, 2, 4], will it ensure it's always emitted in that order? If the first record batch is for some reason way larger than the others, will the others buffer in memory?

kylebarron commented 7 months ago

Would concurrency also work from the JS side, if it created $n$ different promises for $n$ different row groups, and then called Promise.all(), would that still fetch all at the same time?

H-Plus-Time commented 7 months ago

Maybe object_store? The extension points would probably be quite handy when getting to things like partitioned datasets (well, for WebDAV at least). I suppose the biggest upshot would be that eventually, the 1st party request code largely goes away.

I could certainly see circa Q2 2024 wasm-compatible object stores for:

I'll put up a PR for it though, mainly to demonstrate what was involved.

Oh I see, you create a different reader for each record batch index. That seems fine to me.

Each row group, yep.

Yeah, I think they'll only be very brief spikes, and it would be very hard to push it into OOM territory; just an observation really.

Does this maintain ordering?

Yep, ordered both within row groups and between them - there is actually a StreamExt::buffered_unordered function that'd help in a situation like that (highly variable row group sizes), but tbh those use-cases are so rare I wouldn't bother allowing it.

Would concurrency also work from the JS side

Yep, if the target server/browser/network conditions allow it (I've noticed h2 connections sometimes delaying requests, though the browser always schedules them immediately).

kylebarron commented 7 months ago

Yeah, I think they'll only be very brief spikes, and it would be very hard to push it into OOM territory; just an observation really

One consideration here is that I think Wasm memory gets resized upwards but not downwards. So if the peak memory is short but high, you'll be sitting with a very large but mostly unused ArrayBuffer when it ends, right?

Maybe object_store

One other thing to keep in mind with the object store APIs is that they don't have a way to fetch a suffix range request, because apparently azure doesn't support that 😭 https://github.com/apache/arrow-rs/issues/4611

This means that if you use object store, you always first need a HEAD request to get the content length, right? Ref #272

In my prototype in https://github.com/kylebarron/parquet-wasm/pull/393 I'm able to avoid any HEAD request for the length because I fetch the metadata manually.

kylebarron commented 7 months ago

I could certainly see circa Q2 2024 wasm-compatible object stores for:

That's a lot of different target systems... I'm somewhat inclined to only support HTTP to keep the scope down and maintenance minimal, and users can fetch signed URLs to support other object stores.

H-Plus-Time commented 7 months ago

Mm, good point, that makes it a very easy decision - custom implementation.

Going by the suffix thread, maintainers are receptive, I'll queue the From changes up for advent of code (ahead/concurrently with the wasm changes).

Fair re scope. http, a readablestream output, and a file input is sufficient for users to handle the rest in JS land (that last one is largely to support Response, FileSystemAccessHandle, both of which have cheap file methods (it turns out that's all that's really needed to emulate AsyncRead + AsyncSeek) and integrate into ~3-4 core browser APIs.

kylebarron commented 4 months ago
  • It is in fact possible to use ParquetObjectStore if you're willing to implement your own (almost entirely stubbed) impl ObjectStore for FoobarObjectStore. All told, ~250-300 lines for an equally fault-tolerant equivalent to HttpObjectStore.

Do you still have this experiment?

H-Plus-Time commented 3 months ago

I do, yep, it's over at object-store-wasm - it still uses the collect stream -> wrap up as a single item stream trick (up to date with object-store 0.9 though).

Tinkering with ehttp atm (which seems to have markedly less mature header constants), I'm hoping that it'll make it a bit more tractable to do actual streaming (I suspect it's a moot point anyway wrt the parquet reader, but worth it for more general purpose use elsewhere)

kylebarron commented 3 months ago

that repo appears to be private 🥲

H-Plus-Time commented 3 months ago

Whoops, forgot the protocol in the link 😬

kylebarron commented 3 months ago

That's cool! Are you able to add a license to the repo? I started an attempt to connect to object store here but hit a wall and stopped. At present to get things working I vendored an impl of AsyncFileReader, but I'm guessing that using ObjectStore would be more maintainable. I have some time available to get GeoParquet working for remote datasets in JS.

Also note https://github.com/JanKaul/object_store_s3_wasm if you haven't seen it.

H-Plus-Time commented 3 months ago

Yep, license added. Yeah, that was the main bit of prior art that convinced me to just re-implement the client parts. I'll be doing a bit of internal shuffling, but stuff like HttpStore should still be a top-level export.

kylebarron commented 2 months ago

I think this is all resolved in 0.6?