JuliaIO / Parquet.jl

Julia implementation of Parquet columnar file format reader
Other
119 stars 32 forks source link

Not thread safe? #160

Closed tdunning closed 3 years ago

tdunning commented 3 years ago

I have code where I have several thousand parquet files to write. The general pattern is like this:

    groups = [g for g in data |> @groupby(_.tile)]
    for td in groups
        tileData = DataFrame(td)
        tile = tileData[1, :tile]
        fname = joinpath(outputDir, "x-$tileLevel-$tile")
        outputData = tileData |> 
            @orderby(_.h3) |> @thenby(_.t) |> 
            @select(-:longitude, -:latitude, -:tile)
        Parquet.write_parquet(fname, outputData)
    end

As it is here, the code seems to never produce ill-formed files. On the other hand, I find a significant fraction of the files to be corrupted if I change the loop to this

    Threads.@threads for td in groups
        ...
     end

The symptoms of corruption are widely variable, but the following are examples:

ERROR: Can not resolve type. Parquet.PAR2.ColumnOrder is not a subtype of Thrift.TSTOP
Stacktrace:
  [1] error(s::String)
    @ Base ./error.jl:33
  [2] julia_type(typ::Int32, narrow_typ::Type)
    @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:47
  [3] read_container(p::Thrift.TCompactProtocol, val::Vector{Parquet.PAR2.ColumnOrder})
    @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:369
  [4] read
    @ ~/.julia/packages/Thrift/dE8jB/src/base.jl:364 [inlined]
  [5] read(p::Thrift.TCompactProtocol, #unused#::Type{Vector{Parquet.PAR2.ColumnOrder}})
    @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:363
  [6] read_container(p::Thrift.TCompactProtocol, val::Parquet.PAR2.FileMetaData)
    @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:194
  [7] read
    @ ~/.julia/packages/Thrift/dE8jB/src/base.jl:169 [inlined]
  [8] read
    @ ~/.julia/packages/Thrift/dE8jB/src/base.jl:167 [inlined]
  [9] read_thrift(io::IOStream, #unused#::Type{Parquet.PAR2.FileMetaData})
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:402
 [10] metadata(io::IOStream, path::String, len::Int32)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:417
 [11] Parquet.File(path::String, handle::IOStream; map_logical_types::Dict{Union{Int32, Vector{String}}, Tuple{DataType, Function}})
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:75
 [12] Parquet.File(path::String; map_logical_types::Dict{Union{Int32, Vector{String}}, Tuple{DataType, Function}})
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:65
 [13] File
    @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:63 [inlined]
 [14] Table
    @ ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:66 [inlined]
 [15] read_parquet(path::String; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:25
 [16] read_parquet(path::String)
    @ Parquet ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:22
 [17] top-level scope
    @ REPL[5]:1

or this

   nested task error: KeyError: key :encoding not found
    Stacktrace:
     [1] (::var"#61#73"{String, Int64, DataFrame, String, Int64})(input::IOStream)
       @ Main ~/weather-server/pipeline/mrms_split.jl:112
     [2] open(f::var"#61#73"{String, Int64, DataFrame, String, Int64}, args::String; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Base ./io.jl:330
     [3] open(f::Function, args::String)
       @ Base ./io.jl:328
     [4] macro expansion
       @ ~/weather-server/pipeline/mrms_split.jl:101 [inlined]
     [5] (::var"#562#threadsfor_fun#66"{String, String, Int64, Float64, Base.Threads.Atomic{Int64}, Vector{Grouping{Int64, NamedTuple{(:longitude, :latitude, :tile, :h3, :t, :precipitation), Tuple{Float32, Float32, Int64, Int64, Int64, Float32}}}}})(onethread::Bool)
       @ Main ./threadingconstructs.jl:81
     [6] (::var"#562#threadsfor_fun#66"{String, String, Int64, Float64, Base.Threads.Atomic{Int64}, Vector{Grouping{Int64, NamedTuple{(:longitude, :latitude, :tile, :h3, :t, :precipitation), Tuple{Float32, Float32, Int64, Int64, Int64, Float32}}}}})()
       @ Main ./threadingconstructs.jl:48

    caused by: KeyError: key :encoding not found
    Stacktrace:
      [1] getindex
        @ ./dict.jl:482 [inlined]
      [2] getproperty(obj::Parquet.PAR2.DataPageHeader, name::Symbol)
        @ Parquet.PAR2 ~/.julia/packages/Parquet/O0PXc/src/PAR2/PAR2_types.jl:851
      [3] page_encodings
        @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:389 [inlined]
      [4] page_encodings(page::Parquet.PAR2.PageHeader)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:383
      [5] page_encodings(page::Parquet.Page)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:381
      [6] iterate(ccpv::Parquet.ColumnChunkPageValues{Float32}, startpos::Int64)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:274
      [7] iterate
        @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:240 [inlined]
      [8] setrow(cursor::Parquet.ColCursor{Float32}, row::Int64)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/cursor.jl:114
      [9] Parquet.ColCursor(par::Parquet.File, colname::Vector{String}; rows::UnitRange{Int64}, row::Int64)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/cursor.jl:62
     [10] #53
        @ ./none:0 [inlined]
     [11] iterate
        @ ./generator.jl:47 [inlined]
     [12] collect_to!(dest::Vector{Parquet.ColCursor{Int64}}, itr::Base.Generator{Vector{Vector{String}}, Parquet.var"#53#55"{UnitRange{Int64}, Parquet.File}}, offs::Int64, st::Int64)
        @ Base ./array.jl:724

or this

    nested task error: MethodError: no method matching Int32()
    Closest candidates are:
      (::Type{T})(::AbstractChar) where T<:Union{AbstractChar, Number} at char.jl:50
      (::Type{T})(::BigInt) where T<:Union{Int128, Int16, Int32, Int64, Int8} at gmp.jl:356
      (::Type{T})(::Base.TwicePrecision) where T<:Number at twiceprecision.jl:243
      ...
    Stacktrace:
      [1] read_container(p::Thrift.TCompactProtocol, #unused#::Type{Int32})
        @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:168
      [2] read_container(p::Thrift.TCompactProtocol, val::Parquet.PAR2.FileMetaData)
        @ Thrift ~/.julia/packages/Thrift/dE8jB/src/base.jl:190
      [3] read
        @ ~/.julia/packages/Thrift/dE8jB/src/base.jl:169 [inlined]
      [4] read
        @ ~/.julia/packages/Thrift/dE8jB/src/base.jl:167 [inlined]
      [5] read_thrift(io::IOStream, #unused#::Type{Parquet.PAR2.FileMetaData})
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:402
      [6] metadata(io::IOStream, path::String, len::Int32)
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:417
      [7] Parquet.File(path::String, handle::IOStream; map_logical_types::Dict{Union{Int32, Vector{String}}, Tuple{DataType, Function}})
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:75
      [8] Parquet.File(path::String; map_logical_types::Dict{Union{Int32, Vector{String}}, Tuple{DataType, Function}})
        @ Parquet ~/.julia/packages/Parquet/O0PXc/src/reader.jl:65
      [9] File
        @ ~/.julia/packages/Parquet/O0PXc/src/reader.jl:63 [inlined]
     [10] Table
        @ ~/.julia/packages/Parquet/O0PXc/src/simple_reader.jl:66 [inlined]

Examining the Parquet sources, I find that these are typically impossible conditions (assuming a well-formed input file).

Thoughts?

tdunning commented 3 years ago

Thinking about this more, it sounds like the file cache is not thread-safe.

tdunning commented 3 years ago

Attached is a reliable replication of the problem. I am running Julia 1.6.3 and Parquet:

name = "Parquet"
uuid = "626c502c-15b0-58ad-a749-f091afb673ae"
keywords = ["parquet", "julia", "columnar-storage"]
license = "MIT"
desc = "Julia implementation of parquet columnar file format reader and writer"
version = "0.8.3"

Replication

  1. Run julia -t 10. include("parquet-threads.jl")
  2. Call tryit(false). All is good.
  3. Call tryit(true). Things break.
  4. Run julia (no threads). include("parquet-threads.jl")
  5. Call tryit(true). The code is trying to use threads, but there is only one thread so the code works.

The only difference here is writing parquet files in multiple threads. Ergo, thread-safe problem in Parquet.write_parquet

parquet-threads.jl.zip

tdunning commented 3 years ago

Just an update here. Tanmay has isolated the problem to the the handling of the Thrift metadata and has produced this MRE:

using Thrift
using Parquet
using Base.Threads

const meta = open("meta.data", "r") do f
   Thrift.read(Thrift.TCompactProtocol(Thrift.TFileTransport(f)), Parquet.PAR2.FileMetaData)
end

const dir =  mktempdir()

Threads.@threads for i in 1:100
    fname = joinpath(dir, "data$i")
    open(fname, "w") do io
        Thrift.write(Thrift.TCompactProtocol(Thrift.TFileTransport(io)), meta)
    end
end

for i in 1:100
    fname = joinpath(dir, "data$i")
    open(fname, "r") do io
        Thrift.read(Thrift.TCompactProtocol(Thrift.TFileTransport(io)), Parquet.PAR2.FileMetaData)
    end
end

Here is the metadata file required (compressed because of github limitations) meta.data01.zip

tanmaykm commented 3 years ago

Should be fixed now in Thrift v0.8.3 after https://github.com/tanmaykm/Thrift.jl/pull/68

tdunning commented 3 years ago

I confirm that after pulling in the 0.8.3 version of Thrift that my threading test succeeds.

tdunning commented 3 years ago

Is there a new release tag coming soon?

tanmaykm commented 3 years ago

Thrift.jl v0.8.3 is already released with the fix and Parquet.jl compatibility is set to v0.8, so just doing a Pkg.update() to get the new version of Thrift is all that is needed. We do not need a new release of Parquet.

tdunning commented 3 years ago

Ahh... of course!

Well played.