apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.47k stars 738 forks source link

Add Avro Support #4886

Open tustvold opened 11 months ago

tustvold commented 11 months ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Avro is a widely used binary, row-oriented data encoding. It is very similar to protobuf, and has seen very wide adoption in the data ecosystem, especially for streaming workloads.

Describe the solution you'd like

A new arrow_avro crate will provide vectorised support for reading and writing avro data. The APIs should be designed in such a way as to work for the various different container formats for avro encoded data, including single object encoding, object container files and message even if first-class support is not provided for all these framing mechanisms.

Describe alternatives you've considered

Additional context

DataFusion has some avro support, however, it is based on the row-based apache_avro crate and is therefore likely extremely sub-optimal.

FYI @Samrose-Ahmed @sarutak @devinjdangelo I intend to work on this, but any help with reviews / testing would be most welcome

alamb commented 11 months ago

I think having a native Avro --> Arrow code will be good as well as continue to encourage additional use of Arrow in the ecosystem.

bjchambers commented 11 months ago

This would be awesome!

Samrose-Ahmed commented 11 months ago

Awesome happy to review any pr and help with tests.

alamb commented 10 months ago

I think this may have gotten bumped by other priorities and I think @tustvold plans to wrap up whatever his current state is while he works on other things

tustvold commented 10 months ago

I intend to keep working away at this in the background, but any help on reviews would be most appreciated

Jefffrey commented 10 months ago

FYI a duplicate issue also exists: https://github.com/apache/arrow-rs/issues/727

alamb commented 10 months ago

Today I learned that there is a version of the avro reader/writer in arrow2:

https://github.com/jorgecarleitao/arrow2/tree/3ddc6a10c6fbc2d0f85a9f66eeb46112abd07029/src/io/avro

Ten0 commented 8 months ago

Hello, I have recently rewritten a Rust avro crate from ~scratch due to performance and ergonomics issues with the apache-avro crate. (It achieves x10 perf compared to the apache-avro crate). This crate is soon hitting 1.0. I suspect part of that work could be reused to solve this issue, and I could probably help with this. Would you mind explaining in a bit more detail what you mean by "vectorized support for reading and writing avro data", and point me to where that would plug in the code? Thanks 🙂

alamb commented 8 months ago

Would you mind explaining in a bit more detail what you mean by "vectorized support for reading and writing avro data", and point me to where that would plug in the code?

I think this means "reading avro records into Array directly

Here is the way it is implemented in datafusion: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/datasource/avro_to_arrow/reader.rs

There are more sophisticated ways of implementing this feature, for example the tape based methods of the JSON and CSV readers in this crate

What I would personally recommend doing is:

  1. Make a PR with a relatively basic (can be missing features) Avro Reader / writer and use that to work out the interface that is desired (@tustvold may already have this sitting on a branch somewhere)
  2. Implement a basic reader/writer, perhaps using apache-avro or perhaps another implementation, including broad test coverage
  3. Work on optimizing the implementation (using the existing coverage)
tustvold commented 8 months ago

(@tustvold may already have this sitting on a branch somewhere)

I have most of the bits and pieces and hope to push something up in the coming week or two. The major missing thing at the moment is tests.

Would you mind explaining in a bit more detail what you mean by "vectorized support for reading and writing avro data", and point me to where that would plug in the code

I had a brief look at https://github.com/Ten0/serde_avro_fast and I am not sure it necessarily would be a good fit for arrow-rs as it appears to rely on knowing the data schema at compile time, but I could be completely off base here

Ten0 commented 8 months ago

Heyy,

Thanks for the quick answers.

it appears to rely on knowing the data schema at compile time

It doesn't 😊 It goes through serde, but that also works with unknown structure: for example you can convert any avro with dynamic schema to json (and vice versa) just by plugging them via serde_transcode. It seems that the main idea would be to have a custom implementation of DeserializeSeed for arrow records 🤔 (Most likely that would also enable plugging any serde format pretty easily afterwards as well since you could likely plug any deserializer (except maybe you'd want to read the schema in advance, but maybe it's not even necessary if you determine the structure as you receive the first records).)

tustvold commented 8 months ago

That's an interesting idea, typically the way to achieve performant decode is to decode the values for a column at a time, as this allows amortizing per-row overheads, reducing branch misses, etc... This would obviously not be possible with the serde model which is inherently value-oriented, but it is also possible that the nature of the avro encoding, which relies extensively on varint encoding, reduces the benefits of such a columnar approach.

I'll try to get what I have polished up over the next few days, and we can compare benchmarks.

Ten0 commented 8 months ago

typically the way to achieve performant decode is to decode the values for a column at a time, as this allows amortizing per-row overheads, reducing branch misses, etc... This would obviously not be possible with the serde model which is inherently value-oriented, but it is also possible that the nature of the avro encoding, which relies extensively on varint encoding, reduces the benefits of such a columnar approach.

Oh that's interesting! I would have imagined that we would have prepared all the vectors and be pushing to each of them as we read each field. What are you referring to with regards to per-row overheads? (I'd like to read documentation on this topic, I'm familiar with branch prediction but not this.)

That being said indeed with avro's encoding where you have to precisely deserialize each field of an object before you know where the next object starts, plus with the block encoding with the compression, it's very hard for me to imagine that reading several times to extract a single field each time would be the most performant approach. (But even if that was the case, that would look very close to driving the deserializer multiple times, just ignoring all the fields but one each time.)

I'll try to get what I have polished up over the next few days, and we can compare benchmarks.

Wonderful! 😊

Here is the way it is implemented in datafusion

https://github.com/apache/arrow-datafusion/blob/3f219bc929cfd418b0e3d3501f8eba1d5a2c87ae/datafusion/core/src/datasource/avro_to_arrow/reader.rs#L160C1-L168C2

So IIUC the interface we'd want is basically something that enables to convert from an arbitrary (Buf)Reads to something that yields RecordBatches? If somebody confirms that I'm not looking at the wrong track here I may give a go at implementing something for this based on Serde on which we could then plug notably avro support via serde_avro_fast (and if that doesn't work with just serde maybe get a pluggable part for arrow schema specification that could be populated by reading the avro schema). 🙂 Side note: how many RecordBatches? Why not just one? How does one choose this? Is this because datafusion wants to be able to process very large files by stream-processing the batches? Side note 2: I will notably have a look at serde_arrow as well for that purpose - I'm not sure to what extent that implementation is optimal for this purpose currently but it seems to be under active development and it looks like fundamentally serde_avro_fast->serde_transcode->serde_arrow is precisely what I'd be looking for. If that is the case it looks like the implementation would be pretty simple 😄 (My first glance has me wonder why the implementation is so complex but then I don't know too much about constructing arrow values)

tustvold commented 8 months ago

Is this because datafusion wants to be able to process very large files by stream-processing the batches

Yes, whilst this is more important for file formats like parquet that achieve much higher compression ratios than avro, having streaming iterators is pretty standard practice.

I will notably have a look at serde_arrow as well for that purpose - I'm not sure to what extent that implementation is optimal for this purpose currently

You might also be interested in https://docs.rs/arrow-json/50.0.0/arrow_json/reader/struct.Decoder.html#method.serialize

My first glance has me wonder why the implementation is so complex but then I don't know too much about constructing arrow values

Converting between row-oriented and columnar formats is very fiddly, especially where they encode nullability differently :sweat_smile:

What are you referring to with regards to per-row overheads

The major downside of row-oriented approaches is that unless you know the types at compile time, or have a JIT, you are paying the overhead of type-dispatch for every field. The whole premise of vectorised execution is that by instead operating on columns, you can amortise this cost across thousands of values within a given column, as well as make it easier for the compiler to optimise the code.

Ten0 commented 8 months ago

Thanks!

My first glance has me wonder why the implementation is so complex

So I've asked and it turns out that it might indeed be sub-optimal 🫤 So that will probably have to be implemented before benchmarks with that pipeline will be relevant. The good news is that the author appears to be very reactive and open to this 😊

You might also be interested in https://docs.rs/arrow-json/50.0.0/arrow_json/reader/struct.Decoder.html#method.serialize

The benchmarks here seem to suggest that this may be a less efficient approach than the one I drafted at https://github.com/chmp/serde_arrow/issues/118#issue-2085140859 but I will also have a look, maybe the performance loss comes from elsewhere 🤔

Ten0 commented 7 months ago

I'll try to get what I have polished up over the next few days, and we can compare benchmarks.

Here's a quick POC for for full-featured Avro to Arrow using serde_avro_fast, serde_arrow and serde_transcode: https://github.com/Ten0/arrow_serde_avro/blob/0ea1292064f877b210211c09d001e7b7db02fbdf/tests/simple.rs#L60-L61 https://github.com/Ten0/arrow_serde_avro/blob/0ea1292064f877b210211c09d001e7b7db02fbdf/src/lib.rs#L8

It holds in <150 lines total ATM and successfully loads avro object container files to arrow RecordBatch. (Schema conversion is pretty basic ATM but straightforward to add more)

Performance of serde_arrow should be very close to zero-cost abstraction since https://github.com/chmp/serde_arrow/pull/120. There's just https://github.com/chmp/serde_arrow/pull/120#discussion_r1468664717, https://github.com/chmp/serde_arrow/pull/120#discussion_r1468667388 and https://github.com/chmp/serde_arrow/issues/92#issuecomment-1895467586 that are clear areas of potential performance improvements for this particular integration, but that's reasonably quick to implement.

I'll probably PR that before benchmarks (if @chmp hasn't done it before 🚀)

You might also be interested in arrow_json::Decoder::serialize

So I've checked and it adds significant intermediate representations in the "tape" thing. It seems pretty clear that is indeed why it's so far behind in the benchmarks.

tustvold commented 7 months ago

Sounds promising, I'm afraid I'm not likely to have time to take a look for the next week or so, but I will do so when able.

I'm curious if you've tried comparing the performance of arrow-json vs serde_json + serde_transcode + serde_arrow. The reason I ask is the motivation for the tape is to amortise dispatch overheads (among other things) so I am curious if that is being outweighed by other factors

chmp commented 7 months ago

Just a quick comment: the performance of arrow_json improved drastically compared to arrow2_convert in my latest benchmarks. The previous benchmarks run with arrow_json=46, the most recent ones with arrow_json=50. Also the benchmarks are somewhat sensitive to the structure of the records (e.g., nested structs vs. flat structs). So the exact numbers may change.

tustvold commented 7 months ago

That's likely the result of https://github.com/apache/arrow-rs/pull/4861 which avoided needing to serialize numerics to strings. TBC the tape largely exists to serve the needs of the raw arrow JSON decoder. It was only later hooked up into serde because we could do so with little additional effort, only very basic effort has been made to optimise it for this usecase.

I am, however, very interested if a combination of serde_json and serde_arrow is competitive with the raw JSON implementation, as that would open up some very interesting possibilities.

chmp commented 7 months ago

There are some limited tests for serde_json with serde_arrow and my own use case is generating arrow arrays from JSON (to then query them with datafusion). So at least the basics should work. I will add writing some benchmarks for this use case to my todo list.

Ten0 commented 7 months ago

First draft of the benchmark seems to show that the tape achieves ~ the same performance as serde_arrow when plugged to serde_json, but that wouldn't be true as soon as the BTreeMap lookups are removed according to the perf benchmarks, so it seems serde_arrow is faster than the tape as soon as serde is involved.

But more importantly, the specialized implementation of json to arrow that's done in arrow_json performs much better than going through serde, and according to perf that seems to be notably because it spends quite a lot of time validating that &strs are valid UTF-8 as serde imposes that this is done early, whereas the tailored json-to-arrow implementation builds the arrow array without doing any particular validation. That would also be true for a tailored arrow/avro implementation.

ATM it's using random inputs though, which basically contain only escaped strings, so that might be damaging the benchmark's relevance.