JuliaData / Avro.jl

Pure Julia implementation for reading/writing data in the Avro format
MIT License
16 stars 6 forks source link

Problem writing string columns to avro file #17

Open kobusherbst opened 2 years ago

kobusherbst commented 2 years ago

When I try to convert an Arrow file to Avro using this code:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    Avro.writetable(joinpath(path, "$(file).avro"), a; compress=:zstd)
end

I get the following error whenever the input file contains a string column, e.g.

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
ERROR: ArgumentError: internal writing error: buffer too small, len = 1563130

Whereas this version does not produce this error:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    b = select(a, :IndividualId, :ResultDate)
    println(Tables.schema(b))
    Avro.writetable(joinpath(path, "$(file).avro"), b; compress=:zstd)
end

Output:

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
"D:\\Data\\Demography\\AHRI\\Staging\\HIVResults.avro"

How do I resolve this problem if I need a string column in my output file?

djholiver commented 5 months ago

It appears that in the method "tables.writewithschema" only the first row is assessed in assigning the size of the buffer Vector, leading to the above error.

altering to the following, where the maximum row size is leveraged for the buffer Vector, appears to address the issue.


function writewithschema(io, parts, rows, st, sch, dictrow, compress, kw)
    comp = get(COMPRESSORS, compress, nothing)

    schtyp = schematype(sch)
    meta = Dict("avro.schema" => JSON3.write(schtyp))
    if comp !== nothing
        meta["avro.codec"] = String(compress)
    end
    sync = _cast(NTuple{16, UInt8}, rand(UInt128))
    buf = write((magic=MAGIC, meta=meta, sync=sync); schema=FileHeaderRecordType)
    Base.write(io, buf)
    @debug 1 "wrote file header from bytes 1:$(pos - 1)"
    i = 1
    while true
        # if rows didn't have schema or length, we materialized w/ Tables.dictrowtable
        nrow = length(rows)
        @debug 1 "writing block count ($nrow) at pos = $pos"
        rowsstate = iterate(rows)
        pos = 1
        if rowsstate === nothing
            bytes = UInt8[]
            pos = 0
        else
            row, rowst = rowsstate
            # calc nbytes on all rows to find max, then allocate bytes
            bytesperrow = nbytes(schtyp, row)
            while true 
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                if nb > bytesperrow
                    bytesperrow = nb
                end
            end
            rowsstate = iterate(rows)
            row, rowst = rowsstate
            blen = trunc(Int, nrow * bytesperrow * 1.05) # add 5% cushion
            bytes = Vector{UInt8}(undef, blen)
            n = 1
            nb = nbytes(schtyp, row)
            while true
                pos = writevalue(Binary(), schtyp, row, bytes, pos, blen, kw)
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                bytesperrow += nb
                n += 1
            end
        end
        # compress
        if comp !== nothing
            finalbytes = transcode(comp[Threads.threadid()], unsafe_wrap(Base.Array, pointer(bytes), pos - 1))
        else
            finalbytes = bytes
        end
        block = Block(nrow, view(finalbytes, 1:length(finalbytes)), sync)
        buf = write(block; schema=BlockType)
        Base.write(io, buf)
        state = iterate(parts, st)
        state === nothing && break
        part, st = state
        rows = Tables.rows(part)
        sch = Tables.schema(rows)
        if dictrow
            rows = Tables.dictrowtable(rows)
        end
    end
    return
end

I've added a PR for the above.

kobusherbst commented 5 months ago

Thanh you!

djholiver commented 5 months ago

Sorry to @ you @quinnj but would you mind reviewing the associated PR for the above - its self contained.

Your fantastic work on this has potentially been under - utilised; and I believe the PR makes Avro usable "out of the box" for Julia.