delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

Checkpoint stats maxValues is incorrect #2571

Open echai58 opened 3 weeks ago

echai58 commented 3 weeks ago

Environment

Delta-rs version: 0.15.3

Binding: python


Bug

What happened: I have a delta table where if I do to_pyarrow_table on a timestamp column for <= datetime.datetime(2023, 3, 30, 0, 0, 0, 0), it leaves in a row that has value 2023-03-30 00:00:00.000902. I see that when inspecting the fragments of the pyarrow_dataset, there is an expression saying (timestamp <= 2023-03-30 00:00:00.000000)), which is an incorrect max_value for this file. Because of this, it takes advantage of this incorrect expression to do the filtering, thus assuming all values in the file satisfy the predicate.

Looking through the delta-rs code, it seems like this is parsed from the maxValues field of the checkpoint file. I looked at this checkpoint file, and I do indeed see the incorrect maxValues value:

datetime.datetime(2023, 3, 30, 0, 0)

It seems to not include the microseconds field. I see in another timestamp column in this table, the maxValues field has the microsecond precision.

Can someone help look into why this could happen?

ion-elgreco commented 3 weeks ago

Please add a reproducible example :)

echai58 commented 3 weeks ago

@ion-elgreco

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 29, 23,59,59,807126)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us"))]))

write_deltalake(
    "test",
    t,
    mode="error",
    partition_by=["p"],
)

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 30, 0,0,0,902)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us"))]))

write_deltalake(
    "test",
    t,
    mode="append",
    partition_by=["p"],
)

# this works 
print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0)),
        ]
    )
)

# combination of compact + create checkpoint breaks it
DeltaTable("test").optimize.compact()
DeltaTable("test").create_checkpoint()

print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0)),
        ]
    )
)

It seems like it requires a combination of compact + checkpoint that breaks it. I tested this against a newer version of delta-rs (0.17.3), and it seems to be fixed. Do you know which PR fixed this (I'm unable to do an upgrade due to some breaking changes I haven't handled yet, but I could do a local version release including the fix, for now)?

ion-elgreco commented 3 weeks ago

I am not entirely sure, throughout the releases there have been multiple occasions Ive touched timestamp types, I suggest you just pip install each release and check when it fails, if you can tell me which release it got fixed, I might be able to tell faster

echai58 commented 3 weeks ago

@ion-elgreco

Going through the versions, it seems the fix was implicit with the addition of timestampNtz, and it is still an issue for timestamps with a timezone.

Adding utc timezones to my previous example and running on version 0.17.3,

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 29, 23,59,59,807126)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="utc"))]))

write_deltalake(
    "test",
    t,
    mode="error",
    partition_by=["p"],
)

t = pa.Table.from_pandas(pd.DataFrame({
    "p": [1],
    "a": [datetime.datetime(2023, 3, 30, 0,0,0,902)], 
    }), schema=pa.schema([("p", pa.int64()), ("a", pa.timestamp("us", tz="utc"))]))

write_deltalake(
    "test",
    t,
    mode="append",
    partition_by=["p"],
)

# this works 
print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)),
        ]
    )
)

# combination of compact + create checkpoint breaks it
DeltaTable("test").optimize.compact()
DeltaTable("test").create_checkpoint()

print(
    DeltaTable("test").to_pyarrow_table(
        filters = [
            ("a", "<=", datetime.datetime(2023, 3, 30, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)),
        ]
    )
)

I still see the bad output from the second print:

pyarrow.Table
p: int64
a: timestamp[us, tz=UTC]
----
p: [[1,1]]
a: [[2023-03-30 00:00:00.000902,2023-03-29 23:59:59.807126]]
ion-elgreco commented 3 weeks ago

Hmm maybe the precision is lost after checkpointing.

@echai58 can you manually read the contents of the checkpoint with pyarrow and then grab the column with the stats for that add action, I'm curiously what's written there now

echai58 commented 3 weeks ago

I previously stated that the issue was only when you compacted, then checkpointed, but that turns out not to be true.

@ion-elgreco This is the contents of the checkpoint file, without compacting:

{'path': 'p=1/0-a8fd92bc-ce65-4e49-8785-bca8809f6a1b-0.parquet',
 'size': 589.0,
 'modificationTime': 1717517089571.0,
 'dataChange': False,
 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 1.0,
  'minValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

This is the file if I compact before I checkpoint:

{'path': 'p=1/part-00001-cf75aa9a-cee7-48f2-b305-9274eac6a056-c000.zstd.parquet',
 'size': 600.0,
 'modificationTime': 1717517182549.0,
 'dataChange': False,
 'stats': '{"numRecords":2,"minValues":{"a":"2023-03-29T23:59:59.807126Z"},"maxValues":{"a":"2023-03-30T00:00:00.000902Z"},"nullCount":{"a":0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 2.0,
  'minValues': {'a': datetime.datetime(2023, 3, 29, 23, 59, 59, 807000, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

And this is the metadata from the compact log, which has the correct maxValues

{"add":{"path":"p=1/part-00001-cf75aa9a-cee7-48f2-b305-9274eac6a056-c000.zstd.parquet","partitionValues":{"p":"1"},"size":600,"modificationTime":1717517182549,"dataChange":false,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":\"2023-03-29T23:59:59.807126Z\"},\"maxValues\":{\"a\":\"2023-03-30T00:00:00.000902Z\"},\"nullCount\":{\"a\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
ion-elgreco commented 3 weeks ago

Wait, it's not being maintained through checkpointing, right? The maxValues looks incorrect in the checkpoint file.

Maybe I'm lost on the order here, why is the checkpoint not referencing the same path?

echai58 commented 3 weeks ago

Wait, it's not being maintained through checkpointing, right? The maxValues looks incorrect in the checkpoint file.

Maybe I'm lost on the order here, why is the checkpoint not referencing the same path?

I reran the script twice, once with the compact and once without. The path in the compact log matches the path in the second checkpoint parquet I pasted. Sorry for the confusion.

ion-elgreco commented 3 weeks ago

That "stats_parsed" col has lost some precision, I am trying to follow through the code where this happens. But that might be the main culpriit

echai58 commented 3 weeks ago

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed.

But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

ion-elgreco commented 3 weeks ago

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed.

But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

The last one seems fine because it's only 1 record, so 1 timestamp

echai58 commented 3 weeks ago

I agree in the case where I first compacted before checkpointing, it seems the precision is lost in stats_parsed. But in the first case, where I did not compact first, it seems it computed the incorrect maxValues in the first place?

 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "maxValues": {"a": "2023-03-29T23:59:59.807126+00:00"}, "nullCount": {"a": 0}}'

has the incorrect value, even before parsing it.

The last one seems fine because it's only 1 record, so 1 timestamp

Ohh right yeah, that's the add for the other file, sorry.

{'path': 'p=1/1-4f0e9148-4290-4cb9-9555-cfa6691eee5e-0.parquet',
 'size': 589.0,
 'modificationTime': 1717520084596.0,
 'dataChange': False,
 'stats': '{"numRecords": 1, "minValues": {"a": "2023-03-30T00:00:00.000902+00:00"}, "maxValues": {"a": "2023-03-30T00:00:00.000902+00:00"}, "nullCount": {"a": 0}}',
 'partitionValues': [('p', '1')],
 'tags': [],
 'deletionVector': None,
 'stats_parsed': {'numRecords': 1.0,
  'minValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'maxValues': {'a': datetime.datetime(2023, 3, 30, 0, 0, tzinfo=<UTC>)},
  'nullCount': {'a': 0.0}},
 'partitionValues_parsed': {'p': 1.0}}

The add for the other one sees the same problem, where the parsing seems to lose precision.

roeap commented 3 weeks ago

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

echai58 commented 3 weeks ago

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

Ah, yeah that would explain it.

ion-elgreco commented 3 weeks ago

@echai58 @ion-elgreco - this is literally a footnote, but timestamps are truncated to miliseconds when computing stats. Could this be what we are seeing here?

Uhm so the protocol states it should be milliseconds :s. That's quite wrong because we are then passing rounded stats to the pyarrow.dataset fragments which in turn will return wrong results

echai58 commented 3 weeks ago

Instead of truncating, could we round up to the next millisecond when computing the stats? That would prevent this edge case from returning wrong results

ion-elgreco commented 3 weeks ago

Rounding up or down can result in retrieving less or more records than expected. So both doesn't work

echai58 commented 3 weeks ago

@ion-elgreco isn't rounding up always safe? Because if the predicate is <= the rounded up value, it performs the filtering on the loaded in data?