sunchao / parquet-rs

Apache Parquet implementation in Rust
Apache License 2.0
149 stars 20 forks source link

Parameterize reading of rows with a type parameter, checked against the schema, and used to specialize the deserialization of rows #205

Open alecmocatta opened 5 years ago

alecmocatta commented 5 years ago

Proposal

Parameterize reading (and potentially writing) of rows with a type parameter, which is checked against the file's schema, and used to specialize the deserialization (and potentially serialization) of rows.

Achieve this by adding a type parameter to get_row_iter() and RowIter for the user to specify the type of the returned rows. In cases where the type information is not known, a generic enum that can represent any valid type can be used, which would preserve the current dynamically-typed functionality. Type information is also leveraged to provide the projection.

What's currently done:

trait FileReader {
    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
    ...
}

struct RowIter<'a> {...}

impl<'a> Iterator for RowIter<'a> {
    type Item = Row;
    ...
}

fn main() {
    ...
    let schema = "
        message schema {
            OPTIONAL DOUBLE bp1;
            OPTIONAL DOUBLE bp2;
        }
    ";
    let schema = parse_message_type(schema).unwrap();
    let row_iter = file_reader.get_row_iter(Some(schema)).unwrap();
    for row: Row in row_iter {
        println!("bp1: {:?}, bp2: {:?}", row.get_double(0).ok(), row.get_double(1).ok());
    }
}

Which under this proposal becomes:

trait FileReader {
    fn get_row_iter<T>(&self) -> Result<RowIter<T>> where T: ParquetDeserialize;
    ...
}

struct RowIter<'a, T>
where
    T: ParquetDeserialize
{...}

impl<'a, T> Iterator for RowIter<'a, T>
where
    T: ParquetDeserialize
{
    type Item = T;
    ...
}

#[derive(ParquetDeserialize)]
struct StockRecord {
    bp1: Option<f64>,
    bp2: Option<f64>,
}

fn main() {
    ...
    let row_iter = file_reader.get_row_iter().unwrap();
    for row: StockRecord in row_iter {
        println!("bp1: {:?}, bp2: {:?}", row.bp1, row.bp2)
    }
}

Upsides

Currently, allocations are done for each row. This impacts performance (see https://github.com/sunchao/parquet-rs/issues/140). With the user-specified row type, no dynamic allocation needs to occur besides for Lists/Maps within the row.

Currently, the decode logic is largely generic – i.e. there are lots of nested enums which branch on the type. User-specified row type information would enable the logic to be specialised and optimised by the compiler.

Together these would offer a substantial boost to performance.

Projections are typically written as text, parsed with parse_message_type(). The user-specified row type can instead be used as the projection. This saves having to keep them both in sync.

Downsides

More sophisticated API. The old behaviour would however still be available simply with file_reader.get_row_iter::<Row>().

Breaking changes: implementation details like the precise API of the Reader enum are difficult to maintain exactly, and my current implementation doesn't attempt to. As such I would suggest that if these proposed changes are accepted, a semver bump to 0.5 is made.

Prior art

Many/most Rust implementations of serialization/deserialization leverage type information to specialise the logic and avoid allocations. An example of leveraging an enum to enable the spectrum of untyped to strongly-typed (i.e. gradual typing) is serde_json::Value.

Implementation

A new trait (which I'm currently calling ParquetDeserialize), implemented on u8, i8, u16, i16, u32, i32, u64, i64, f32, f64, Vec<u8>, String, Decimal, Timestamp, Row, List<T>, Map<K,V>, as well as Option<T> of each of the aforementioned.

This trait has two associated types:

And has methods to:

It is implemented on tuples (up to length 32), where it is valid for reading group types that exactly match the length and types of the tuple (i.e. ignoring names). This is intended as a convenience for reading group types without having to create a struct.

It can be derived on structs with #[derive(ParquetDeserialize)], where it is valid for reading group types that have all of the field names as columns with matching types. Projection can be achieved by omitting fields from the struct.

Projection (avoiding unnecessary reading by specifying which columns you're interested in) would change from being given as a Type (which is in practise usually calculated from the text version of the schema), to being inferred directly from the user-specified type. The assumption here is that if the user has knowledge about the schema of the file to use as a projection, they should include that knowledge in the type in any case.

List and Map would become typed, i.e. List<T> and Map<K,V>. They can be dynamically-typed akin to the current implementation like so: List<Value> and Map<Primitive,Value> (Primitive for the key as that is a restriction imposed by the format).

A new generic Value enum that can represent any valid type, which preserves the current dynamically-typed functionality:

#[derive(Clone, PartialEq, Debug)]
enum Value {
    Bool(bool),
    U8(u8),
    I8(i8),
    U16(u16),
    I16(i16),
    U32(u32),
    I32(i32),
    U64(u64),
    I64(i64),
    F32(f32),
    F64(f64),
    Timestamp(Timestamp<i96>),
    ByteArray(Vec<u8>),
    String(String),
    List(List<Value>),
    Map(Map<Primitive,Value>),
    Group(Row),
    Option(Option<ValueRequired>),
}
impl Value {
    fn is_i32(&self) -> bool;
    fn as_i32(self) -> Option<i32>;
    fn is_string(&self) -> bool;
    fn as_string(self) -> Option<String>;
    ...
}

as well as a Primitive enum that excludes List, Map and Row.

Interaction with other features/work

I'm not so familiar with the Row writing mechanisms, so I'm currently unsure how that is impacted. https://github.com/sunchao/parquet-rs/pull/197, https://github.com/sunchao/parquet-rs/issues/203 are relevant issues. There is potential for the exact schema to be written to be taken from the type (as per https://github.com/sunchao/parquet-rs/issues/203), though it needs to be overridable as there are multiple schemas that map to a single type. For example there are 6 valid schemas for a List<T>, so the ability to provide a custom schema to specify which one if not the default is necessary.

Status

I've implemented the bulk of this, and it's running successfully on all the test data. I'm looking for feedback as to how best to contribute my work back to this project. I'd like to make a PR later this week if it's ready (probably after Christmas if it's not), but wanted to let the community know that this is being actively worked on to avoid any duplication of effort, and garner any thoughts and feedback in the meantime.

sadikovi commented 5 years ago

Thanks for the write up and your interest.

I have a few questions:

alecmocatta commented 5 years ago

@sadikovi Thanks for your response!

Good questions.

schema column names that cannot be struct fields in Rust

I haven't implemented this yet but I intend to use the same approach as #[serde(rename = "...")], i.e.

struct Record {
    #[parquet(rename = "¡field_name!")]
    field_name: u64
}

such that any column name can be referred to, while keeping the struct field name valid.

lists

I believe I've used all of the relevant work and workarounds that are in the codebase currently to handle this correctly. i.e. if the user provides List<T>, then the file's schema could be any of the various legacy representations (including a repeated unannotated field) and it will be read successfully.

substantial boost in performance. Could you mention numbers

Of course! I'm in the middle of this work so there's still a way to go, but the early numbers show a 2-7x improvement.

Before:

test record_reader_10k_collect               ... bench:  27,693,510 ns/iter (+/- 3,378,251) = 24 MB/s
test record_reader_stock_simulated_collect   ... bench: 195,962,326 ns/iter (+/- 38,229,416) = 6 MB/s

After:

test record_reader_10k_collect               ... bench:  14,824,617 ns/iter (+/- 906,887) = 45 MB/s
test record_reader_stock_simulated_collect   ... bench:  29,025,634 ns/iter (+/- 911,072) = 44 MB/s

schemas larger than 32 fields

32 is an arbitrary number, I can make it higher or lower if you think it's appropriate – it's a trade-off of convenience vs compile time. Groups can be deserialized to tuples, but also to structs annotated with #[derive(ParquetDeserialize)]. As such, whenever groups are bigger than this arbitrary cut-off, the option of deserializing to a struct remains. I also think in practise by the time your group grows to >32 fields, structs are preferable over tuples anyway as you can name and better document what the fields are.

sadikovi commented 5 years ago

I would like to see the code, because you are using existing record assembly machinery and seeing these improvements. I am curious to what changes you have made to improve the throughput? Thanks.

How do you do projection on legacy parquet files then? Do you prune columns from already parsed schema?

alecmocatta commented 5 years ago

@sadikovi An estimate based on my experimentation so far: 50% of the speedup is from avoiding allocation (and the resulting optimisations the compiler can make; allocation is an optimisation blocker), 25% is from specialization, and 25% is from various other changes. I have further to go so I'm hopeful I'll speed it up a bit more – in theory the bottleneck should be syscalls and decompression here rather than anything else so that's what I'm aiming for.

The first method on the new trait mentioned above, "Produce Self::Schema given the user-provided type and a Type. This returns a helpful error if they don't match.", recursively "matches" on the actual schema it's applied to, returning a value of type Self::Schema that dynamically encodes the mapping from the type to the actual schema. For example, for List<T> an enum is returned that encodes which of the 6 different representations is being used. This is then used to generate the appropriate Self::Reader, which itself can be an enum if necessary for different representations (although I'm trying to avoid this as it would harm specialization).

Projection occurs as a byproduct of this recursive "matching" process – if a struct omits a field that is in the actual schema, then its column is ignored, no reader is generated and no value is read for it.

sunchao commented 5 years ago

Thanks @alecmocatta ! The performance improvement looks very impressive 👍 ! Looking forward to a PR on this 😄 .

sadikovi commented 5 years ago

Allocation of what? How do you avoid allocation - you need to return rows? Are you using a mutable row reference instead?

sunchao commented 5 years ago

Hi @alecmocatta , just curious whether there's any update on this?

alecmocatta commented 5 years ago

@sunchao I've been on holiday but will PR this next week. I intend to open a JIRA and PR against https://github.com/apache/arrow/tree/master/rust/parquet/src, is that the right thing to do?

sunchao commented 5 years ago

Thanks. Yes, filing a JIRA against arrow is the right thing to do. Looking forward to it!

alecmocatta commented 5 years ago

My fork is here: https://github.com/alecmocatta/parquet-rs

It currently triggers an ICE on usage https://github.com/rust-lang/rust/issues/53443 and much is currently commented out until I finish refactoring.

I'll investigate, finish the refactor, clean up the code, and rebase on https://github.com/apache/arrow in the coming week or so.

alecmocatta commented 5 years ago

JIRA here: https://issues.apache.org/jira/browse/ARROW-4314

sunchao commented 5 years ago

Thanks @alecmocatta ! Could you open a pull request in arrow? it's a pretty big change and I'll take some time to look at it.