sunchao / parquet-rs

Apache Parquet implementation in Rust
Apache License 2.0
149 stars 20 forks source link

Issue when reading PLAIN_DICTIONARY fields #10

Closed sadikovi closed 7 years ago

sadikovi commented 7 years ago

I am currently working on Usage/Examples section in README and wanted to add couple of code snippets on how to use library (I hope you do not object to this). While working on it, found that there is a potential issue when reading columns with PLAIN_DICTIONARY encoding.

I am trying to read data/alltypes_plain.snappy.parquet file that is in repository with code that is attached below.

Discovered that this returns me empty buffers when I read values. For example, the first column in Parquet file is "id" that has values 6 and 7:

column 0:
--------------------------------------------------------------------------------
column type: INT32
column path: "id"
encodings: RLE PLAIN_DICTIONARY PLAIN
file path: N/A
file offset: 55
num of values: 2
total compressed size (in bytes): 51
total uncompressed size (in bytes): 47
data page offset: 27
index page offset: N/A
dictionary page offset: 4

When I run my code it prints following:

Row group: 0
Read column 0: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
! Skip column 1, unknown type: BOOLEAN
Read column 2: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
Read column 3: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
Read column 4: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
! Skip column 5, unknown type: INT64
! Skip column 6, unknown type: FLOAT
! Skip column 7, unknown type: DOUBLE
! Skip column 8, unknown type: BYTE_ARRAY
! Skip column 9, unknown type: BYTE_ARRAY
! Skip column 10, unknown type: INT96

All returned buffers are empty.

Turns out that there are several issues with reading values: when reading dictionary page we should not increment number of seen values, because they are just for dictionary, we have not read actual values yet, and also encoding assignment for data page .

diff contains updated test_file_reader test:

diff --git a/src/column/reader.rs b/src/column/reader.rs
index 9d36918..cea0bc5 100644
--- a/src/column/reader.rs
+++ b/src/column/reader.rs
@@ -213,9 +213,11 @@ impl<'a, T: DataType> ColumnReaderImpl<'a, T> where T: 'static {
                 self.def_level_decoder = Some(def_decoder);
               }

-              if encoding == Encoding::PLAIN_DICTIONARY {
-                encoding == Encoding::RLE_DICTIONARY;
-              }
+              let encoding = if encoding == Encoding::PLAIN_DICTIONARY {
+                Encoding::RLE_DICTIONARY
+              } else {
+                encoding
+              };

               let decoder =
                 if encoding == Encoding::RLE_DICTIONARY {
diff --git a/src/file/reader.rs b/src/file/reader.rs
index ea7871d..5863f57 100644
--- a/src/file/reader.rs
+++ b/src/file/reader.rs
@@ -308,7 +308,6 @@ impl PageReader for SerializedPageReader {
           assert!(page_header.dictionary_page_header.is_some());
           let dict_header = page_header.dictionary_page_header.as_ref().unwrap();
           let is_sorted = dict_header.is_sorted.unwrap_or(false);
-          self.seen_num_values += dict_header.num_values as i64;
           Page::DictionaryPage {
             buf: ByteBufferPtr::new(buffer), num_values: dict_header.num_values as u32,
             encoding: Encoding::from(dict_header.encoding), is_sorted: is_sorted
@@ -396,7 +395,7 @@ mod tests {
     let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap();
     let mut page_count = 0;
     while let Ok(Some(page)) = page_reader_0.get_next_page() {
-      let is_dict_type = match page {
+      let is_expected_page = match page {
         Page::DictionaryPage{ buf, num_values, encoding, is_sorted } => {
           assert_eq!(buf.len(), 32);
           assert_eq!(num_values, 8);
@@ -404,12 +403,22 @@ mod tests {
           assert_eq!(is_sorted, false);
           true
         },
-        _ => false
+        Page::DataPage { buf, num_values, encoding, def_level_encoding, rep_level_encoding } => {
+          assert_eq!(buf.len(), 11);
+          assert_eq!(num_values, 8);
+          assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
+          assert_eq!(def_level_encoding, Encoding::RLE);
+          assert_eq!(rep_level_encoding, Encoding::BIT_PACKED);
+          true
+        },
+        _ => {
+          false
+        }
       };
-      assert!(is_dict_type);
+      assert!(is_expected_page);
       page_count += 1;
     }
-    assert_eq!(page_count, 1);
+    assert_eq!(page_count, 2);
   }

   fn get_test_file<'a>(file_name: &str) -> fs::File {

With these changes I get following output - looks correct:

Row group: 0
Read column 0: 2 values (0 levels): [6, 7, 0, 0, 0, 0, 0, 0]
! Skip column 1, unknown type: BOOLEAN
Read column 2: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
Read column 3: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
Read column 4: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
! Skip column 5, unknown type: INT64
! Skip column 6, unknown type: FLOAT
! Skip column 7, unknown type: DOUBLE
! Skip column 8, unknown type: BYTE_ARRAY
! Skip column 9, unknown type: BYTE_ARRAY
! Skip column 10, unknown type: INT96
sadikovi commented 7 years ago

Example code I am trying to run cargo run --bin example (I place it in src/bin/example.rs in repository. I will convert it into usage example, if it is okay.

extern crate parquet;

use std::fs::File;
use std::path::Path;

use parquet::basic::*;
use parquet::data_type::*;
use parquet::column::reader::{ColumnReaderImpl, get_typed_column_reader};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::printer::print_parquet_metadata;

fn print_values<'a, T: DataType>(column: usize, typed_column_reader: &mut ColumnReaderImpl<'a, T>) where T: 'static {
    // for now just hard code this
    let batch_size = 8;
    let mut actual_values = vec![T::T::default(); batch_size];
    let mut actual_def_levels = vec![i16::default(); batch_size];
    let mut actual_rep_levels = vec![i16::default(); batch_size];
    let mut curr_values_read = 0;
    let mut curr_levels_read = 0;
    loop {
        let (values_read, levels_read) = typed_column_reader.read_batch(
            batch_size,
            Some(&mut actual_def_levels[curr_levels_read..]),
            Some(&mut actual_rep_levels[curr_levels_read..]),
            &mut actual_values[curr_values_read..]
        ).unwrap();

        curr_values_read += values_read;
        curr_levels_read += levels_read;

        if values_read == 0 {
            break;
        }
    }
    println!("Read column {}: {} values ({} levels): {:?}",
        column, curr_values_read, curr_levels_read, actual_values);
}

fn main() {
    println!("Reading Parquet file");
    let file_path = "data/alltypes_plain.snappy.parquet";
    let path = Path::new(file_path);
    let file = File::open(&path).unwrap();
    let parquet_reader = SerializedFileReader::new(file).unwrap();
    let metadata = parquet_reader.metadata();
    let num_row_groups = metadata.num_row_groups();
    print_parquet_metadata(&mut std::io::stdout(), metadata);

    for i in 0..num_row_groups {
        println!("Row group: {}", i);
        let row_group_reader = parquet_reader.get_row_group(i).unwrap();
        let num_columns = row_group_reader.num_columns();
        let row_group_metadata = metadata.row_group(i);
        for j in 0..num_columns {
            // let mut page_reader = row_group_reader.get_column_page_reader(j).unwrap();
            // while let Some(page) = page_reader.get_next_page().unwrap() {
            //     println!("Row group: {}, column: {}, read new page (num values: {}, encoding: {}, data: {:?})",
            //         i, j, page.num_values(), page.encoding(), page.buffer().data());
            // }
            let column_chunk_metadata = row_group_metadata.column(j);
            let column_reader = row_group_reader.get_column_reader(j).unwrap();
            let typed_column_reader = match column_chunk_metadata.column_type() {
                Type::INT32 => {
                    Some(get_typed_column_reader::<Int32Type>(column_reader))
                },
                other => {
                    // Just testing int32 columns for now
                    // I will implement this for other columns later
                    println!("! Skip column {}, unknown type: {}", j, other);
                    None
                },
            };

            if let Some(mut typed_column_reader) = typed_column_reader {
                print_values(j, &mut typed_column_reader);
            }
        }
    }
}

@sunchao I will convert it into 2 spaces indent when ready:)

sadikovi commented 7 years ago

@sunchao Could you review this problem? It is also possible that my code to read is wrong. I could submit pull request if it happens to be a bug.

sunchao commented 7 years ago

@sadikovi Yes this does seem like a bug.. Please file a PR. Thanks!

sadikovi commented 7 years ago

@sunchao I am happy to do that. Thanks.

Apart from this issue: do you think it is good idea to add usage examples? I am very interested to know what you think about adding higher level API for reading (whether or not it is overall a good idea) and what ideas you have for such API - so we would have to write fewer code to read records (can also be useful for testing), e.g. Parquet -> JSON.

sunchao commented 7 years ago

@sadikovi Yes I think it's good to add a few examples. Currently the ColumnReaderImpl is not easy to use. We could consider add a "scanner" type that functions as an iterator over the values in a column. Something like this.

In future I'm thinking about adding conversion to Apache Arrow format, so the read API can support higher level types such as lists and maps.

sadikovi commented 7 years ago

@sunchao this sounds great! I am looking forward to conversion into Apache Arrow format!:)