sunchao / parquet-rs

Apache Parquet implementation in Rust
Apache License 2.0
149 stars 20 forks source link

How to write a None value to a column #174

Open xrl opened 5 years ago

xrl commented 5 years ago

I am converting postgres-stored data in to a parquet file and I am going to deal with a sparse list of strings. How do I use the column writer to write those Option?

With this schema:

  message schema {
    REQUIRED INT32 id;
    OPTIONAL BINARY ad_po_number;
  }

I get this error:

error[E0308]: mismatched types
   --> src/main.rs:127:31
    |
127 |             typed.write_batch(&addr_organization_names[..], None, None).unwrap();
    |                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `parquet::data_type::ByteArray`, found enum `std::option::Option`
    |
    = note: expected type `&[parquet::data_type::ByteArray]`
               found type `&[std::option::Option<std::vec::Vec<u8>>]`

Is OPTIONAL not what I think it is?

sadikovi commented 5 years ago

There is no concept of Null/None value type in Parquet. Null values are managed by definition levels, depending on the value of the definition level you get the null value at nesting level that you need.

So basically, if you use the low-level API, the simple example would be something like this:

writer.write_batch(&[1, 2, 3], Some(&[1, 1, 0, 1]), None);

This would result in something like this:

[1]
[2]
[null]
[3]

Of course, it gets more complicated when you have arrays or nested structs; but it looks like you are writing primitive values only, not complex ones, like maps.

OPTIONAL in schema tells reader/writer that this field is expected to nulls, so it needs to create data structures for definition and repetition levels, otherwise, it is omitted.

Hope that helps. It looks we do need some high level API for writes.

xrl commented 5 years ago

Thanks for the tips, I read over some Twitter blog posts and now I get it a little more, the definition level is for specifying how much of the structure is defined. Sounded like a bitmask at first but I see the number is for quantifying how fully defined the value is.

Now I'm doing two passes over the list of structure with optional string, one pass to gather up the ByteArrays, the other to mark whether the value should be written or not.

Unfortunately I'm not getting it to work just yet! The file is always the same size, about 2MB. Here is the code I'm trying:

fn do_postgresy_stuff(args: Args) -> Result<(),diesel::ConnectionError> {
    let conn = PgConnection::establish(&args.arg_POSTGRES_URI)?;

    let count = addresses.count().load::<i64>(&conn).unwrap()[0];
    let page_size = 1000;
    let mut page_count = count / page_size;
    if count % page_count == 0 {
        page_count += 1;
    }
    println!("count: {}, page_size: {}, page_count: {}", count, page_size, page_count);

    let mut pfile = create_addresses_parquet_file().unwrap();

    for p in 0..page_count {
        let res = addresses.order(ad_dsl::id.desc()).limit(page_size).offset(p * page_size).load::<Address>(&conn).unwrap();
        println!("got a page {}", p);

        let mut row_group = pfile.next_row_group().unwrap();

        {
            let mut column_writer = row_group.next_column().unwrap().unwrap();
            let addr_ids = res.iter().map(|x| x.id).collect::<Vec<i32>>();
            if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
                typed.write_batch(&addr_ids[..], None, None).unwrap();
            }
            row_group.close_column(column_writer).unwrap();

            let mut column_writer = row_group.next_column().unwrap().unwrap();
            let empty: Vec<u8> = Vec::new();
            let addr_organization_names = res.iter().map(|x|
                if let Some(ref org_name) = x.organization_name {
                    org_name.as_bytes().to_owned().into()
                } else {
                    empty.clone().into()
                }
            ).collect::<Vec<ByteArray>>();
            let addr_def_levels = res.iter().map(|x|
                if x.organization_name.is_some() {
                    1
                } else {
                    0
                }
            ).collect::<Vec<i16>>();
            if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
                typed.write_batch(&addr_organization_names[..], Some(&addr_def_levels[..]), None).unwrap();
            }
            row_group.close_column(column_writer).unwrap();
        }

        pfile.close_row_group(row_group).unwrap();
    }

    Ok(())
}

got any tips on how to debug this? I'm going to try dumping the parquet file as-is to make sure the schema is correct.

sadikovi commented 5 years ago

Well, it looks alright. Couple of things:

You probably want to make it like this:

if count % page_count != 0 {
  page_count += 1;
}

Another thing is that values slice should only contain non-null elements. For example, if you have "a", null, "b", null, you would have values slice as ["a", "b"] and definition levels [1, 0, 1, 0]. Try that, see if that works. Hope it helps.

But it looks like we need to make write API better or more usable.

sunchao commented 5 years ago

Agreed. Ideally user of this library should not need to know about repetition & definition levels. Maybe column writer could have a "append" API which allows user to keep adding values and then generate the column once it's done.

xrl commented 5 years ago

Looks like my writer code had a variety of errors, namely

I was able to track down those last two by creating a sample in-memory data set and make sure it could be read back. The last code snippet above was actually generating corrupt parquet files, failing to write the footer. I think this ties back to my "Help A Newbie Out" ticket #173.

Here's the working sample, using the same schema parquet file as above:

fn do_sample_run(args: Args) -> Result<(),()> {
    let ids = vec![1,2,3,4,5,6,7,8,9,10];
    let names : Vec<ByteArray> = vec!["yes", "no", "maybe", "sure", "never"].into_iter().map(|x| ByteArray::from(x)).collect();
    let names_def_levels = vec![0,1,0,1,0,1,0,1,0,1,];

    let mut pfile = create_addresses_parquet_file().unwrap();

    // Writes the IDs
    let mut row_group = pfile.next_row_group().unwrap();

    let mut column_writer = row_group.next_column().unwrap().unwrap();
    if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
        typed.write_batch(&ids[..], None, None).unwrap();
    } else {
        panic!("no bueno");
    }
    row_group.close_column(column_writer).unwrap();

    // Write the names
    let mut column_writer = row_group.next_column().unwrap().unwrap();

    if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
        let bytes_to_write = names.iter().fold(0, |acc,x| acc+x.len());
        println!("writing {} org names ({} bytes)", names.len(), bytes_to_write);
        typed.write_batch(&names[..], Some(&names_def_levels[..]), None).unwrap();
    } else {
        panic!("no bueno");
    }
    row_group.close_column(column_writer).unwrap();

    pfile.close_row_group(row_group).unwrap();

    pfile.close().unwrap();

    Ok(())

}

then tested with the parquet-reader cli (fetched with cargo install parquet):

$ parquet-read target/debug/addresses.parquet
{id: 1, ad_po_number: null}
{id: 2, ad_po_number: [121, 101, 115]}
{id: 3, ad_po_number: null}
{id: 4, ad_po_number: [110, 111]}
{id: 5, ad_po_number: null}
{id: 6, ad_po_number: [109, 97, 121, 98, 101]}
{id: 7, ad_po_number: null}
{id: 8, ad_po_number: [115, 117, 114, 101]}
{id: 9, ad_po_number: null}
{id: 10, ad_po_number: [110, 101, 118, 101, 114]}

and it looks right!

xrl commented 5 years ago

A little bit of refactoring and I've got something which will be useful as I expand my collection of writers. I imagine I'll be writing a lot of sparse lists of bytearray data:

fn write_vec_of_option_string<T,F>(values: Vec<T>, accessor: F, column_writer: &mut ColumnWriter) -> Result<(),()>
    where F: Fn(&T) -> &Option<String> {
    let col = values.iter().map(|x| accessor(x)).filter_map(|opt: &Option<String>|
        if let Some(ref inner) = opt {
            Some(inner.as_bytes().to_owned().into())
        } else {
            None
        }
    ).collect::<Vec<ByteArray>>();

    let def_levels = values.iter().map(accessor).map(|opt: &Option<String>|
        if opt.is_some() {
            1
        } else {
            0
        }
    ).collect::<Vec<i16>>();

    if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
        typed.write_batch(&col[..], Some(&def_levels[..]), None).unwrap();
    } else {
        return Err(())
    }

    Ok(())
}

fn write_vec_of_option_string_single_pass<T,F>(values: Vec<T>, accessor: F, column_writer: &mut ColumnWriter) -> Result<(),()>
    where F: Fn(&T) -> &Option<String> {

    let mut col = Vec::with_capacity(values.len());
    let mut def_levels = Vec::with_capacity(values.len());

    for value in values {
        let opt = accessor(&value);
        if let Some(ref inner) = opt {
            col.push(inner.as_bytes().to_owned().into());
            def_levels.push(1);
        } else {
            def_levels.push(0);
        }
    }

    if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
        typed.write_batch(&col[..], Some(&def_levels[..]), None).unwrap();
    } else {
        return Err(())
    }

    Ok(())
}

when called it looks something like:

            write_vec_of_option_string_single_pass(res, |x| &x.organization_name, &mut column_writer).unwrap();

I've read that rust's iterator code is sometimes suboptimal so I will benchmark these two flavors of writing a column.

Edit: if it wasn't clear, I'm looking for feedback! Please let me know if this API is somewhat useful.

sadikovi commented 5 years ago

@sunchao yes, you are right. I am also thinking about doing something similar to #173, to make sure users close column writers, row groups and files. I was personally in favour of adding Row API similar to reads, where you give us a Vec<Row> and we will write them into a file with proper definition and repetition levels. But, of course, we could do similar thing in a number of ways.

@xrl Looks great! Would be fantastic if you could post the pain points you have encountered, so we could address them at some point.

We could literary create commands to convert CSV and JSON into parquet, I am not sure how useful that would be. Let me know. Cheers!

sunchao commented 5 years ago

@xrl : thanks! the code snippet you provided is very helpful, we'll try to make the API more convenient soon.

@sadikovi : +1 on a Row API. We can borrow some ideas on how parquet-mr does it.

xrl commented 5 years ago

Would it make sense to have a serde-style procedural macro for generating the writer?

#[derive(ParquetWriter)]
struct {
  id: i32,
  #[parquet(skip=true)]
  ssn: String,
  country: String
}

combined with a schema I'm not sure the compiler can guarantee what is generated is compatible. but it could alleviate boilerplate.

sadikovi commented 5 years ago

Yes, it could work. But I would suggest implementing a simple approach first, just to see how much work it is and potential pitfalls - and it looks like you have already done some work!

If we implemented a row api like we did for reading, it would result in a much easier to use interface - user would give us list of records and potentially schema, and we would create a proper parquet file and hide all of the complexity behind it.

We could start working on proposal for that and/or updating low level api! But it is a bit off topic from the original issue:)