JuliaIO / Parquet.jl

Julia implementation of Parquet columnar file format reader
Other
112 stars 32 forks source link

Reading Int64 columns throws Inexact Error #149

Closed gzheng92 closed 2 years ago

gzheng92 commented 3 years ago

I have a snappy parquet file with an Int64 column (the schema identifies this correctly)

I tried to read it in with the following code:

mapping = Dict(["colname"] => (Integer, Parquet.logical_decimal));
pf = Parquet.File(f_list[1]; map_logical_types = mapping);

RecordCursor(pf)

and got the following error

InexactError: check_top_bit(Int32, 18446744072400845597)

In reading through the codec.jl file, I couldn't follow how to fix this

gzheng92 commented 3 years ago

Is this project inactive?

quinnj commented 3 years ago

@tanmaykm ^^

tanmaykm commented 3 years ago

It is possible that the encoding type for that column is not supported yet in Parquet.jl. Out of all the possible datatypes that can be annotated as decimals only int64 and fixed_len_byte_array are implemented now, from what I recall.

Having a sample parquet file that can reproduce this error will help identify the cause.

gzheng92 commented 3 years ago

The issue is a bit more complex than I initially realized. Please let me know if I should open up a new issue. (I no longer think the error is about the underlying data)

Here's the update:

I have a parquet file which reads fine into python via pandas or pyarrow

When I read the same file in using read_parquet, I get a mysterious error when trying to use it

x = read_parquet(fpath)
rows = Tables.rows(x) # DataFrame or Tables.columns yields the same problem
---
InexactError: check_top_bit(Int32, 18446744072352552589)

Stacktrace:
  [1] throw_inexacterror(f::Symbol, #unused#::Type{Int32}, val::UInt64)
    @ Core ./boot.jl:602
  [2] check_top_bit
    @ ./boot.jl:616 [inlined]
  [3] toInt32
    @ ./boot.jl:666 [inlined]
  [4] Int32
    @ ./boot.jl:751 [inlined]
  [5] convert(#unused#::Type{Int32}, x::UInt64)
    @ Base ./number.jl:7
  [6] _read_zigzag(io::Thrift.TFileTransport, typ::Type{Int32})
    @ Thrift ~/.julia/packages/Thrift/xZ0ck/src/codec.jl:67
  [7] read(p::Thrift.TCompactProtocol, t::Type{Int32})
    @ Thrift ~/.julia/packages/Thrift/xZ0ck/src/protocols.jl:491
  [8] read_container(p::Thrift.TCompactProtocol, val::Parquet.PAR2.PageHeader)
    @ Thrift ~/.julia/packages/Thrift/xZ0ck/src/base.jl:194
  [9] read
    @ ~/.julia/packages/Thrift/xZ0ck/src/base.jl:169 [inlined]
 [10] read
    @ ~/.julia/packages/Thrift/xZ0ck/src/base.jl:167 [inlined]
 [11] read_thrift(io::IOStream, #unused#::Type{Parquet.PAR2.PageHeader})
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:402
 [12] (::Parquet.var"#42#43"{Parquet.ColumnChunkPages, Int64})()
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:171
 [13] (::Parquet.var"#36#38"{Parquet.var"#42#43"{Parquet.ColumnChunkPages, Int64}, Parquet.PageLRU, Tuple{Parquet.PAR2.ColumnChunk, Int64}})()
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:34
 [14] lock(f::Parquet.var"#36#38"{Parquet.var"#42#43"{Parquet.ColumnChunkPages, Int64}, Parquet.PageLRU, Tuple{Parquet.PAR2.ColumnChunk, Int64}}, l::ReentrantLock)
    @ Base ./lock.jl:187
 [15] cacheget
    @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:30 [inlined]
 [16] iterate(ccp::Parquet.ColumnChunkPages, startpos::Int64)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:167
 [17] iterate(ccpv::Parquet.ColumnChunkPageValues{String}, startpos::Int64)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:262
 [18] iterate
    @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:240 [inlined]
 [19] setrow(cursor::Parquet.ColCursor{String}, row::Int64)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/cursor.jl:114
 [20] Parquet.ColCursor(par::Parquet.File, colname::Vector{String}; rows::UnitRange{Int64}, row::Int64)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/cursor.jl:62
 [21] #53
    @ ./none:0 [inlined]
 [22] iterate
    @ ./generator.jl:47 [inlined]
 [23] collect_to!(dest::Vector{Parquet.ColCursor{Int64}}, itr::Base.Generator{Vector{Vector{String}}, Parquet.var"#53#55"{UnitRange{Int64}, Parquet.File}}, offs::Int64, st::Int64)
    @ Base ./array.jl:724
 [24] collect_to_with_first!(dest::Vector{Parquet.ColCursor{Int64}}, v1::Parquet.ColCursor{Int64}, itr::Base.Generator{Vector{Vector{String}}, Parquet.var"#53#55"{UnitRange{Int64}, Parquet.File}}, st::Int64)
    @ Base ./array.jl:702
 [25] collect(itr::Base.Generator{Vector{Vector{String}}, Parquet.var"#53#55"{UnitRange{Int64}, Parquet.File}})
    @ Base ./array.jl:683
 [26] BatchedColumnsCursor(par::Parquet.File; rows::UnitRange{Int64}, batchsize::Int64, reusebuffer::Bool, use_threads::Bool)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/cursor.jl:254
 [27] cursor(table::Parquet.Table)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:130
 [28] load(table::Parquet.Table)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:134
 [29] getcolumn
    @ ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:186 [inlined]
 [30] getcolumn
    @ ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:184 [inlined]
 [31] getcolumn
    @ ~/.julia/packages/Tables/YzCZp/src/fallbacks.jl:244 [inlined]
 [32] rowcount(cols::Tables.CopiedColumns{Parquet.Table})
    @ Tables ~/.julia/packages/Tables/YzCZp/src/fallbacks.jl:11
 [33] show(io::IOContext{IOBuffer}, table::Tables.CopiedColumns{Parquet.Table}; max_cols::Int64)
    @ Tables ~/.julia/packages/Tables/YzCZp/src/Tables.jl:207
 [34] show
    @ ~/.julia/packages/Tables/YzCZp/src/Tables.jl:206 [inlined]
 [35] show
    @ ./multimedia.jl:47 [inlined]
 [36] limitstringmime(mime::MIME{Symbol("text/plain")}, x::Tables.CopiedColumns{Parquet.Table})
    @ IJulia ~/.julia/packages/IJulia/e8kqU/src/inline.jl:43
 [37] display_mimestring
    @ ~/.julia/packages/IJulia/e8kqU/src/display.jl:71 [inlined]
 [38] display_dict(x::Tables.CopiedColumns{Parquet.Table})
    @ IJulia ~/.julia/packages/IJulia/e8kqU/src/display.jl:102
 [39] #invokelatest#2
    @ ./essentials.jl:708 [inlined]
 [40] invokelatest
    @ ./essentials.jl:706 [inlined]
 [41] execute_request(socket::ZMQ.Socket, msg::IJulia.Msg)
    @ IJulia ~/.julia/packages/IJulia/e8kqU/src/execute_request.jl:112
 [42] #invokelatest#2
    @ ./essentials.jl:708 [inlined]
 [43] invokelatest
    @ ./essentials.jl:706 [inlined]
 [44] eventloop(socket::ZMQ.Socket)
    @ IJulia ~/.julia/packages/IJulia/e8kqU/src/eventloop.jl:8
 [45] (::IJulia.var"#15#18")()
    @ IJulia ./task.jl:411

When I read the original file into python, write it out to a new file, and read the new file using read_parquet |> Tables.rows, it reads in without complaint

gzheng92 commented 3 years ago

I had erroneously assumed that, because it was complaining about a big number, that the error was related to the column of timestamps in the file

tanmaykm commented 3 years ago

When I read the original file into python, write it out to a new file, and read the new file using read_parquet |> Tables.rows, it reads in without complaint

So something changed when the file was read and written back via python?

the error was related to the column of timestamps in the file

Only Int96 timestamps can be read currently. Was there any difference in the underlying data type of the timestamp column between the original file and the one that was written out from python?

gzheng92 commented 3 years ago

The old schema was:

Schema:
    spark_schema {
      optional INT64 ...
      optional BYTE_ARRAY ... # (from UTF8) 
      optional BYTE_ARRAY ... # (from UTF8) 
      optional BYTE_ARRAY ... # (from UTF8) 
      optional DOUBLE ...
      optional DOUBLE ...
      optional DOUBLE ...
      optional BYTE_ARRAY ... # (from UTF8) 
    }

The new schema is

Schema:
    required schema {
      optional INT64 ...
      optional BYTE_ARRAY ... # (from UTF8) 
      optional BYTE_ARRAY ... # (from UTF8) 
      optional BYTE_ARRAY ... # (from UTF8) 
      optional DOUBLE ...
      optional DOUBLE ...
      optional DOUBLE ...
      optional BYTE_ARRAY ... # (from UTF8) 
    }

They look identical to me

tanmaykm commented 3 years ago

Hmm.. can't think of anything else rightaway. Having a sample file to repro would surely help anyone who wants to look into this.

gzheng92 commented 3 years ago

Unfortunately the data is sensitive and can’t be shared… and writing the data into a new file doesn’t reproduce the problem, heh

I’m happy to provide other output and manually censor results though

On Jun 10, 2021 at 9:04:58 PM, Tanmay Mohapatra @.***> wrote:

Hmm.. can't think of anything else rightaway. Having a sample file to repro would surely help anyone who wants to look into this.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JuliaIO/Parquet.jl/issues/149#issuecomment-859247166, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKCXXU4VGV5LHYTZTRPPUWLTSGDOVANCNFSM44YCRPPQ .

ximonsson commented 2 years ago

I am having the same issue when reading some parquet files that were created using pyspark. After being read and re-written using pandas and pyarrow, Parquet.jl can also load the files.

This is really not my domain so I am not sure I found anything useful, but I tried digging around to see if I could find the problem and it seems to be when loading the :crc field of the PAR2.PageHeader struct.

What seems to be the issue in this case lies in the implementation of read(::TCompactProtocol, ::Type{Int32}) from Thrift.jl. There it reads in i32 values by loading the bytes to an u64 and then converts it to i32, and in some cases this value is just too large which creates an error like here.

First thing I noticed when looking through the pyarrow source code was this comment. Looking at the source code for their writer and reader, they seem to ignore the crc field completely, which probably explains that loading and storing through pyarrow "fixes" it (and maybe why this has not been an issue earlier?).

I am unsure what is the correct fix for this issue. The implementation of the read function for i32 in Thrift.jl seems very odd to me, but I don't want to open an issue there in case that is the intended implementation for apache thrift. If no one else has filed a similar issue there I am guessing that it works as intended, and the error might be pyspark writing the field incorrectly. Alternatively one could take inspiration from pyarrow and ignore the crc. I can see that the writer already does so, but this feels like a lazy solution.

ximonsson commented 2 years ago

I took a look at the official implementation of Apache Thrift for reading i32 in both python and go, and it is identical to the one in Thrift.jl, so no error there.

The only thing I can think of then is spark incorrectly writing the crc field.

tanmaykm commented 2 years ago

@ximonsson thanks for reporting and the details. Does this (https://github.com/tanmaykm/Thrift.jl/pull/65) fix the reading for you? There was an issue in reading zigzag encoded integers which that fixes. Should be available with Thrift v0.8.1

ximonsson commented 2 years ago

big thanks @tanmaykm, that worked perfect for me.

gzheng92 commented 2 years ago

Fixed my issue as well!

tanmaykm commented 2 years ago

Great!