JuliaIO / Parquet.jl

Julia implementation of Parquet columnar file format reader
Other
116 stars 32 forks source link

Added a batched columns iterator #83

Closed tanmaykm closed 4 years ago

tanmaykm commented 4 years ago

Adds a way to iterate over data in a columnar fashion. Result on each iteration is a named tuple with column names and corresponding vector of data. It is currently restricted to reading non-nested schemas, but can be extended to support nested schemas later. The batch size is determined by the size of row group in the parquet file, it iterates over one row group worth of data at one time.

julia> cc = Parquet.BatchedColumnsCursor(par)
Batched Columns Cursor on customer.impala.parquet
    rows: 1:150000
    batches: 1
    cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment

julia> batchvals, state = iterate(cc);

julia> propertynames(batchvals)
(:c_custkey, :c_name, :c_address, :c_nationkey, :c_phone, :c_acctbal, :c_mktsegment, :c_comment)

julia> length(batchvals.c_name)
150000

julia> batchvals.c_name[1:5]
5-element Array{Union{Missing, String},1}:
 "Customer#000000001"
 "Customer#000000002"
 "Customer#000000003"
 "Customer#000000004"
 "Customer#000000005"
tanmaykm commented 4 years ago

Here are some timings I got in tests:

julia> par = ParFile("dsd50p.parquet");

julia> @time collect(Parquet.BatchedColumnsCursor(par));
  5.620772 seconds (39.11 M allocations: 1.723 GiB, 7.16% gc time)

julia> @time collect(Parquet.BatchedColumnsCursor(par));
  0.117820 seconds (182.06 k allocations: 191.918 MiB, 4.35% gc time)
xiaodaigh commented 4 years ago

So this is a RowGroupIterator?

 cc = Parquet.BatchedColumnsCursor(par)

for batchvals in cc
  # do something
end

Am I right in understanding that batchvals in my example above is a named tuple contain columns from a row group?

If that's the case, why not just call it the RowGroupIterator?

tanmaykm commented 4 years ago

Yes, it returns a named tuple of columns. I felt we may have other strategies to batch in future, so didn't tie the name to row groups.

xiaodaigh commented 4 years ago

There is a bug here where strings are not returned correctly. If the data is encoded in BYTE_ARRAY then you need to check the converted type to see if should be a string. The ConvertedType for string is UTF8 which has value 0. See the relevant thrift definition as below.

image

xiaodaigh commented 4 years ago

This seems quite slow for large parquet files. You can find some large CSVs here https://docs.rapids.ai/datasets/mortgage-data and convert them to parquet and test.

tanmaykm commented 4 years ago

String conversion can be done by passing the map_logical_types=true option while optning the Parquet file. Is there any place in the schema where ConvertedType can be picked up from?

Could you post some timings that you see on large files?

xiaodaigh commented 4 years ago

String conversion can be done by passing the map_logical_types=true option while optning the Parquet file.

Shouldn't this be automatically done in the case of strings? Both the R and Python reader reads them as strings.

Is there any place in the schema where ConvertedType can be picked up from?

Yes, it is stored in the metadata.schema[i].converted_type for some i. If the type is BYTE_ARRAY then you need to check the converted_type parameter and if it's filled and is UTF8 (which is 0) then you need to cast the byte array as String.

When you load the data you need to check that. The dsd50p.parquet dataset you used in your benchmark has some strings.

Could you post some timings that you see on large files?

I had to stop it. In R it took 30 and it Julia it was full 2-3 mins before I stopped it. Just test it on any dataset on the rapid ai page I linked above. Or you can generate some synthetic data, write it using the writer branch and test.

tanmaykm commented 4 years ago

Yes, it is stored in the metadata.schema[i].converted_type for some i

:+1: that can be done automatically then

Just test it on any dataset on the rapid ai page

Are they parquet files? If not how did you convert them, with what rowgroup size?

xiaodaigh commented 4 years ago

Are they parquet files? If not how did you convert them, with what rowgroup size?

Honestly, I just read them using R's fread and then write them out using the arrow library.

library(data.table)
arrow::write_parquet(fread(path), out_parquet_path)

and I try to read them using the iterator.

tanmaykm commented 4 years ago

Okay... that could be the reason, because by default it may be writting one huge chunk of all rows. That's where we can have different strategies for iterating. I plan to take that up next.