delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.32k stars 406 forks source link

Error decoding field 'stats' when creating checkpoint #2743

Open Nekit2217 opened 3 months ago

Nekit2217 commented 3 months ago

Environment

Delta-rs version: 0.18.1

Binding: rust


Bug

What happened: While running kafka-delta-ingest the service crashes with an error when trying to create a checkpoint: DeltaTable { source: Arrow { source: JsonError("whilst decoding field 'add': while decoding field 'stats': offset overflow decoding Utf8") } }

The error pops up here: https://github.com/apache/arrow-rs/blob/49840ec0f110da5e9a21ce97affd32313d0b720f/arrow-json/src/reader/string_array.rs#L81

What you expected to happen: The checkpoint will be created successfully

How to reproduce it: Not sure how to reproduce this, presumably it would require a table with a large delta log and data volume

More details: Not sure what additional information to provide: Partitioning by year, month, day, and hour; Current transaction number: 00000000000002020757; Approximately 4TB are written to the table per day; Each parquet file is on average 800MB

Nekit2217 commented 3 months ago

I fixed the error by changing the type of the stats field, but it feels like a workaround

        static ref ADD_FIELDS: Vec<ArrowField> = arrow_defs![
            path:Utf8,
            size:Int64,
            modificationTime:Int64,
            dataChange:Boolean,
            stats:Utf8,
        static ref ADD_FIELDS: Vec<ArrowField> = arrow_defs![
            path:Utf8,
            size:Int64,
            modificationTime:Int64,
            dataChange:Boolean,
            stats:LargeUtf8,
Nekit2217 commented 3 months ago

After the change, spark stopped reading data

sherlockbeard commented 2 months ago

Utf8 is like to 2gb of data @_@

can you share your table schema and spark error? also please check if there are any huge json in _delta_log if you have stats on string or binary columns and they might create this problem.

i can kind of reproduce it . (plz don't do it if you love your laptop)

import pandas as pd
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

py_list_ran = {}
for i in range(1, 600):
    py_list_ran[chr(97) * 4000 * i] = 10000 * i

py_table = pa.Table.from_pylist([py_list_ran])

write_deltalake("temp5", py_table, engine="rust", configuration={"delta.dataSkippingNumIndexedCols": "-1"})

table = DeltaTable("temp5")

table.create_checkpoint()

print(table.schema())
# table.optimize.compact()

Exception: Json error: whilst decoding field 'add': whilst decoding field 'stats': offset overflow decoding Utf8

EDIT1

you can also recreate it using this it will stop when combined log is more than 2 gb

from deltalake import DeltaTable, write_deltalake, WriterProperties
from deltalake._internal import TableNotFoundError
import pyarrow as pa

for j in range(1, 50):
    py_list_ran = {}
    for i in range(1, 100):
        py_list_ran[chr(97) * 4000 * i] = 10000 * i

    py_table = pa.Table.from_pylist([py_list_ran])

    write_deltalake("temp5", py_table, engine="rust", configuration={"delta.dataSkippingNumIndexedCols": "-1"}, mode="append")

table = DeltaTable("temp5")

table.create_checkpoint()

print(table.schema())
    self._table = RawDeltaTable(
                  ^^^^^^^^^^^^^^
Exception: Json error: whilst decoding field 'add': whilst decoding field 'stats': offset overflow decoding Utf8
Nekit2217 commented 2 months ago

Hi, sorry for the delay

Here is the schema:

schema ```rust root |-- field: timestamp (nullable = true) |-- field: struct (nullable = true) | |-- field: long (nullable = true) | |-- field: long (nullable = true) | |-- field: long (nullable = true) |-- field: struct (nullable = true) | |-- field: long (nullable = true) | |-- field: long (nullable = true) | |-- field: string (nullable = true) | |-- field: long (nullable = true) | |-- field: long (nullable = true) |-- field: struct (nullable = true) | |-- field: string (nullable = true) | |-- field: decimal(30,15) (nullable = true) | |-- field: boolean (nullable = true) | |-- field: integer (nullable = true) | |-- field: struct (nullable = true) | | |-- field: string (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: array (nullable = true) | | | | |-- field: integer (containsNull = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: string (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: integer (containsNull = true) | | | |-- field: struct (nullable = true) | | | | |-- field: string (nullable = true) | |-- field: struct (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: array (nullable = true) | | | | | |-- field: string (containsNull = true) | | | | |-- field: integer (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: integer (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: boolean (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: integer (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: array (nullable = true) | | | | | |-- field: string (containsNull = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: long (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: boolean (nullable = true) | | | |-- field: boolean (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: array (nullable = true) | | | | |-- field: integer (containsNull = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: integer (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: integer (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: long (nullable = true) | |-- field: struct (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: string (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: string (containsNull = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: timestamp (nullable = true) | | |-- field: long (nullable = true) | | |-- field: long (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: double (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: double (nullable = true) | | |-- field: double (nullable = true) | | |-- field: long (nullable = true) | | |-- field: string (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: string (containsNull = true) | | |-- field: boolean (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: map (nullable = true) | | | | |-- field: string | | | | |-- field: string (valueContainsNull = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) |-- field: struct (nullable = true) | |-- field: string (nullable = true) | |-- field: string (nullable = true) | |-- field: string (nullable = true) | |-- field: string (nullable = true) | |-- field: decimal(30,15) (nullable = true) | |-- field: struct (nullable = true) | | |-- field: string (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: string (containsNull = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: string (containsNull = true) | | |-- field: integer (nullable = true) | | |-- field: string (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: integer (containsNull = true) | | |-- field: boolean (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | |-- field: struct (nullable = true) | | |-- field: string (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: string (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: boolean (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: string (nullable = true) | | | |-- field: integer (nullable = true) | | | |-- field: integer (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: string (nullable = true) | | |-- field: array (nullable = true) | | | |-- field: string (containsNull = true) | | |-- field: string (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: decimal(30,15) (nullable = true) | | | |-- field: integer (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: long (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: string (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: decimal(30,15) (nullable = true) | | |-- field: double (nullable = true) | | |-- field: double (nullable = true) | | |-- field: double (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: double (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | | |-- field: long (nullable = true) | | |-- field: struct (nullable = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) | | | |-- field: array (nullable = true) | | | | |-- field: string (containsNull = true) |-- field: integer (nullable = true) |-- field: integer (nullable = true) |-- field: integer (nullable = true) |-- field: integer (nullable = true) |-- field: struct (nullable = true) | |-- field: struct (nullable = true) | | |-- field: double (nullable = true) | | |-- field: integer (nullable = true) | | |-- field: integer (nullable = true) | |-- field: array (nullable = true) | | |-- field: struct (containsNull = true) | | | |-- field: string (nullable = true) | |-- field: array (nullable = true) | | |-- field: string (containsNull = true) | |-- field: array (nullable = true) | | |-- field: struct (containsNull = true) | | | |-- field: string (nullable = true) | | | |-- field: double (nullable = true) | | | |-- field: struct (nullable = true) | | | | |-- field: string (nullable = true) | | | | |-- field: double (nullable = true) | | | | |-- field: struct (nullable = true) | | | | | |-- field: string (nullable = true) | | | | | |-- field: double (nullable = true) | | | | |-- field: array (nullable = true) | | | | | |-- field: struct (containsNull = true) | | | | | | |-- field: string (nullable = true) | | | | | | |-- field: double (nullable = true) | | | |-- field: string (nullable = true) ```

Spark does not return any errors, it simply ignores the checkpoints that were recorded with stats: LargeUtf8

No huge json logs, 30 KB on average

We currently work around this by using a multi-part checkpoint that is created when running the optimization on the databricks,but this is very slow