sunchao / parquet-rs

Apache Parquet implementation in Rust
Apache License 2.0
149 stars 20 forks source link

CSV to Parquet #183

Open nevi-me opened 5 years ago

nevi-me commented 5 years ago

Hi, I'm experimenting with creating a CSV to Parquet writer, and I have a few questions.

My endgoal from the experiment is to create a crate that converts various file formats (csv, bson, json) to parquet. This would lend itself to being a possible CSV to Apache Arrow reader.

  1. ~How can I write strings? I used message schema {REQUIRED BYTE_ARRAY name;}, but when I read a parquet file in Python, the strings are shown as bytes.~ I found this one, {REQUIRED BYTE_ARRAY name;} (UTF8)

If I have a csv file that looks like:

Id,Name,Age
123-adf,John Doe,25
sfd-ge2,Jane Doe,26

I have written the below:

fn write_parquet() -> Result<(), Box<Error>> {

    // TODO let message_type = build_parquet_schema();
    let message_type = "
        message schema {REQUIRED BYTE_ARRAY Id;REQUIRED BYTE_ARRAY Name;REQUIRED INT32 Age;}
    ";

    let schema = Rc::new(parse_message_type(message_type)?);
    let props = Rc::new(WriterProperties::builder().build());
    let file = File::create("./data/file1.parquet")?;
    let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
    let mut row_group_writer = writer.next_row_group().unwrap();
    let mut col_number = 0;

    while let Some(mut col_writer) = row_group_writer.next_column().unwrap() {
        col_number = col_number + 1;
        match col_writer {
            ColumnWriter::ByteArrayColumnWriter(ref mut typed_writer) => {
                println!("writing a byte array");
                // I can remove this if-else when I start taking fn parameters of my schema and columns
                if col_number == 1 {
                    typed_writer.write_batch(
                        &[parquet::data_type::ByteArray::from("123-adf"), parquet::data_type::ByteArray::from("sdf-ge2")], None, None
                    )?;
                } else {
                    typed_writer.write_batch(
                        &[parquet::data_type::ByteArray::from("John Doe"), parquet::data_type::ByteArray::from("Jane Doe")], None, None
                    )?;
                }
            },
            ColumnWriter::Int32ColumnWriter(ref mut typed_writer) => {
                println!("writing an integer");
                typed_writer.write_batch(
                    &[25, 26], None, None
                )?;
            },
            _ => {}
        }
        row_group_writer.close_column(col_writer)?;
    }
    writer.close_row_group(row_group_writer)?;
    writer.close()?;
    Ok(())
}
  1. ~Similar to Q1, am I writing my strings properly, or is there a better way?~

  2. From reading through the conversation on #174, it looks like I have to specify an index of where my values are. So, does typed_writer.write_batch(&[24,25,24,26,27,28], None, None) produce a less compact file? Is it even valid?

  3. In general, does this library allow appending to existing parquet files? I haven't tried it yet.

I'm trying a naive approach of first reading csv files and generating the schema with a string builder. If that works, I can look at macros.

The other thing that I'll explore separately is how to convert csv's StringRecord values into parquet Types. Might use regex to figure out if things are strings, i32, i64, timestamps, etc. If someone knows of an existing way, that'd also be welcome.

sadikovi commented 5 years ago

ping @xrl and @sunchao You, guys, are more familiar with Arrow and converting CSV to Parquet.

  1. Yes, you are right, it is a byte array with a UTF8 logical type. Here is the doc that might help you: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

  2. Looks right to me. You just need to specify UTF8 for that column in your schema.

  3. Don't quite understand what you mean. Definition and repetition levels are for the cases when you nulls or repeated fields, like arrays, maps, etc. If you don't have either of those, that is the way to write values. It should be faster to write when you just have a list of non-null values, because we short circuit a few things in the code. Not sure about less compact file, because it depends on values you write. Imagine if you write 100 identical non-null strings in one file and 100 identical strings, but 99 of them are nulls, in the other. Of course, the second file will be smaller, because we would write only 1 string and then store definition levels like 1-99.

  4. I don't think you can append to parquet files in general, due to their structure. You would have to either create a new file, or read records from one, add new ones and write a new file. I know that you don't have to necessarily decompress values, you can just append row groups or something like that, parquet-mr has tooling for it - please, correct me if I am wrong. This library does not have anything for it yet - might do later.

sadikovi commented 5 years ago

It looks like we do need a high-level API for writes. @xrl @sunchao let me know if this is required, I can create an SPIP issue and kind of small design doc for it.

nevi-me commented 5 years ago

Thanks @sadikovi, for number 4, I forgot that when modifying a parquet, you indeed rewrite a new file.

On number 3, a csv use-case is simpler because I won't have nested values. I might however have nulls. Your explanation addresses most of my question. so for nulls I might be able to:

let all_fields: Vec<Type> = some_data;

let unique_vals: Vec<Type> = some_data.distinct();

let positions: Vec<usize> = fn_field_positions(&all_fields, &unique_vals);

typed_writer.write_batch(&all_fields, None, Some(positions)) // here I'm trying to write unique vals and their positions. This might obviously be incorrect
sadikovi commented 5 years ago

Note that you provide non-null values and the list of definition levels. Also it looks like people struggle to write values. I will start working on high level write API, so you will be able to easily map values from CSV or JSON. On Sat, 3 Nov 2018 at 1:30 PM, Neville Dipale notifications@github.com wrote:

Thanks @sadikovi https://github.com/sadikovi, for number 4, I forgot that when modifying a parquet, you indeed rewrite a new file.

On number 3, a csv use-case is simpler because I won't have nested values. I might however have nulls. Your explanation addresses most of my question. so for nulls I might be able to:

`rust let all_fields: Vec = some_data;

let unique_vals: Vec = some_data.distinct();

let positions: Vec = fn_field_positions(&all_fields, &unique_vals);

typed_writer.write_batch(&all_fields, None, Some(positions)) // here I'm trying to write unique vals and their positions. This might obviously be incorrect

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/sunchao/parquet-rs/issues/183#issuecomment-435584196, or mute the thread https://github.com/notifications/unsubscribe-auth/AHbY3pKeIGt7KPZp6vgch5wVh7RYKem4ks5urYxUgaJpZM4YMq0U .

nevi-me commented 5 years ago

I've made some progress with generating a schema from inspecting a sample of csv values. An easier write API would be great, as right now I don't know how to deal with nulls.

To deal with nulls, would a function/interface that takes Option<Type> work?

sadikovi commented 5 years ago

Okay, I will start planning work for high level API, plus, fixing bunch of other tickets from the backlog.

Yes, that would work, but we already have a high level Row API, user would just have to map data to that and we will write a proper file and would handle nulls, arrays, maps, etc.

Cheers! On Sat, 3 Nov 2018 at 5:13 PM, Neville Dipale notifications@github.com wrote:

I've made some progress with generating a schema from inspecting a sample of csv values. An easier write API would be great, as right now I don't know how to deal with nulls.

To deal with nulls, would a function/interface that takes Option work?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/sunchao/parquet-rs/issues/183#issuecomment-435599876, or mute the thread https://github.com/notifications/unsubscribe-auth/AHbY3mLYp6nYqqKiULuRSC-5RO7_5xbVks5urcCqgaJpZM4YMq0U .

xrl commented 5 years ago

Great question! I had a similar problem and this is what I came up with:

let mut column_writer = row_group.next_column().unwrap().unwrap();
write_vec_of_option_borrowed_string_single_pass(&quoted_wares, |qw| qw.as_ref().map(|x| Some(&x.name[..])).unwrap_or(None), &mut column_writer).unwrap();
row_group.close_column(column_writer).unwrap();

and the awkwardly named write_vec_of_option_borrowed_string_single_pass:

fn write_vec_of_option_borrowed_string_single_pass<T,F>(values: &[T], accessor: F, column_writer: &mut ColumnWriter) -> Result<(),()>
    where F: Fn(&T) -> Option<&str> {

    let mut col = Vec::with_capacity(values.len());
    // I think this is what you care about
    let mut def_levels = Vec::with_capacity(values.len());

    for value in values {
        let opt = accessor(&value);
        if let Some(ref inner) = opt {
            col.push(inner.as_bytes().to_owned().into());
            // there is a value present so put a 1 in the definition level (acts like a bitmask)
            def_levels.push(1);
        } else {
            // there is no value so put a 0 for the definition level
            def_levels.push(0);
        }
    }

    if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
        typed.write_batch(&col[..], Some(&def_levels[..]), None).unwrap();
    } else {
        return Err(())
    }

    Ok(())
}

it gets even more complicated if you schema is nested so thankfully I've avoided that! I wouldn't know where to start with building out that code. You'll also see that my code isn't as generic as it could be, that's mostly just related to the lack of generality for writing strings (they have to be owned bytes).

xrl commented 5 years ago

@sadikovi I do agree that a high level write API is required. I've been hammering out my diesel-to-parquet code and I've been writing many different flavors of Vec<Option> writers and stumbling over keeping the column writers lined up with the schema. I have some observations that are probably best put in an issue. I'll wait for you to post your design and I will definitely chime in!

nevi-me commented 5 years ago

@xrl, I haven't gotten there. with nulls. Here's my code to read a csv with strings and integers https://gist.github.com/nevi-me/443025fe11038e2709083db2e24a5e64

I can do trial & error for other field types. Not the most effecient, but it's a start.