apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
402 stars 147 forks source link

Inconsistent row count across versions #1132

Closed dev-goyal closed 4 days ago

dev-goyal commented 3 weeks ago

Apache Iceberg version

0.7.1 (latest release)

Please describe the bug 🐞

Noticing some fairly weird behaviour with pyiceberg - with the same exact code being run across different versions of the API, we're seeing different counts returned. Have tried this with athena, and can confirm that the 0.6.1 count is the correct one. Any ideas on where to look when debugging this?

Can confirm that the .plan_files() and delete_files is identical across the two versions.

import  pyiceberg
print(pyiceberg.__version__)

from pyiceberg import catalog as pyi_catalog

catalog = pyi_catalog.load_catalog(name="default", type="glue")
table = catalog.load_table("ml_recommendations.users_v2")
scan = table.scan(
    row_filter=kwargs["row_filter"]
)

df_users = scan.to_duckdb("users")
df_users.sql("SELECT count(*) FROM users")
>> 0.6.1
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ count_star() β”‚
β”‚    int64     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚      6700635 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
>> 0.7.1
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ count_star() β”‚
β”‚    int64     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚      1973154 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
dev-goyal commented 3 weeks ago

Nothing too exotic in the row_filter btw:

'row_filter': And(left=GreaterThanOrEqual(term=Reference(name='last_session'), literal=literal('2024-06-06T00:00:00+00:00')), right=EqualTo(term=Reference(name='agg_cluster_name'), literal=literal('san_francisco')))

sungwy commented 3 weeks ago

Hi @dev-goyal thank you for raising this issue, that looks like a critical issue we want to resolve asap. Based on what you mentioned here:

Can confirm that the .plan_files() and delete_files is identical across the two versions.

Is it correct to assume that this is a table with position deletes? We've observed a correctness issue in reading tables with position deletes before that was since resolved, but it could be possible that there's more edge cases that needs to be resolved here. I think the best approach here will be to try to understand more about the characteristic of this specific table that is resulting in issues, and replicating the issue in a control test sample

dev-goyal commented 3 weeks ago

No position deletes, it has equality deletes! Exact delete query:

res = spark.sql(
        f"DELETE FROM ml_recommendations.users_v2 AS users WHERE EXISTS ("
        f"  SELECT id FROM {features_db_name}.{v.CDC_USER_PROFILE_TABLE} WHERE deleted=true AND users.id=id"
        f")"
    )
sungwy commented 3 weeks ago

Hi @dev-goyal - thank you for sharing the query! I don't think that query means that the table is using equality deletes. I'm not an expert on the delete files, but according to this mail list thread, the community seems to be under the impression that Equality Delete markers are only produced by Flink: https://lists.apache.org/thread/3s36xkmgj01996mkmg0lqxw2k5lhlxf4

If you are deleting records using Spark SQL, and if you are using Merge-On-Read mode for deletion on your table property (write.delete.mode = merge-on-read), then I think you are likely producing position delete markers in your delete files.

dev-goyal commented 3 weeks ago

Thanks @sungwy , that makes sense to me - I am indeed using MOR (version 2), so this makes sense to me! Let me know how else I might be able to help.

daturkel commented 3 weeks ago

I'm on @dev-goyal's team. We're reverting to 0.6.1 in the interim as it doesn't seem to suffer from this bug. It's difficult to construct a minimal reproducible example, but let us know if there are any tests we can run on our side that would be useful for you.

A little more info: We run a daily etl which adds rows to our Iceberg table, and then we have a separate job where pyiceberg consumes some of that data. On some days, pyiceberg gets the count right, while periodically (~2x per week) it misses many rows as Dev showed in his original message. (On all days, Athena gets the count correct though.) We've not been able to determine what makes the bad days special, nor has the diff of the data between days shown any obvious patterns in the rows that are missing/not missing.

sungwy commented 3 weeks ago

@daturkel and @dev-goyal - I really appreciate you both being so patient regarding this issue, and keeping this open line of communication as we debug this issue together.

It's difficult to construct a minimal reproducible example

Yes, that's unfortunately what I'm having issues with as well. I'm trying to create some tests to replicate this issue with little luck - here's a draft PR where I'm investigating the issue for reference: https://github.com/apache/iceberg-python/pull/1141

let us know if there are any tests we can run on our side that would be useful for you

Yes, that would be incredibly helpful. Here are some experiments I think will help us localize the problem in no particular order...

import pyarrow.dataset as ds
from pyiceberg.io.pyarrow import _read_all_delete_files, PyArrowFileIO

table = catalog.load_table("ml_recommendations.users_v2")
scan = table.scan(
    row_filter=kwargs["row_filter"]
)
# this returns the number of data files that need to be read for the given table scan that matches the row_filter. If we can find a row_filter that results in just a single task, I think it'll easier to investigate this issue.
tasks = scan.plan_files()

scheme, netloc, _ = PyArrowFileIO.parse_location(table.table_metadata.location)
fs = io.fs_by_scheme(scheme, netloc)

deletes_per_file = _read_all_delete_files(fs, tasks)
for task in tasks:
    _, _, path = PyArrowFileIO.parse_location(task.file.file_path)
    arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
    with fs.open_input_file(path) as fin:
        fragment = arrow_format.make_fragment(fin)
        # this returns the total number of rows in a parquet file
        fragment.count()
    # this returns all of the position deleter markers associated with this specific file. The total number of rows should be 
    fragment.count() - unique count of deletes_for_this_file
    deletes_for_this_file = deletes_per_file.get(task.file.file_path)

If this specific table scan only has 1 task / file associated with it (i.e. row_filter matches your table's partition, and you only have a single data file per partition), it would be easy to compare fragment.count() and deletes_for_this_file against the total count of the scan. That'll help us narrow down the root cause of the issue to how we are aggregating the position deletes. And we may benefit from replicating the characteristic of that specific file, and its corresponding position deletes in our test suite.

If you feel it would be helpful to debug this issue together, feel free to send me a message on the Iceberg Slack channel! It may be helpful to take a look at your specific table together.

kevinjqliu commented 3 weeks ago

A little more info: We run a daily etl which adds rows to our Iceberg table, and then we have a separate job where pyiceberg consumes some of that data. On some days, pyiceberg gets the count right, while periodically (~2x per week) it misses many rows as Dev showed in his original message. (On all days, Athena gets the count correct though.) We've not been able to determine what makes the bad days special, nor has the diff of the data between days shown any obvious patterns in the rows that are missing/not missing.

Good to know that Athena gets the correct count. From that, we can assume that the table state is correct, i.e. the catalog state, the metadata, and data files.

This means the culprit is likely when Pyiceberg reads the underlying table.

I also noticed the "wrong count" (v0.7.1) is smaller than the "right count" (v0.6.1). 6,700,635 for v0.6.1 and 1,973,154 for 0.7.1. Is this always the case? If so, PyIceberg might be filtering out moreΒ data than necessary.

sungwy commented 2 weeks ago

Hi @dev-goyal and @daturkel are you still seeing the same issues?

daturkel commented 2 weeks ago

Hi @sungwy and @kevinjqliu , we are still having this issue (we've reverted to v0.6.1 to avoid it) but have been a bit busy leading up to a launch so haven't been able to troubleshoot as much. In a week or so when things cool down we would love to troubleshoot some more.

To answer one question though, the wrong count was definitely always smaller than the right count!

sungwy commented 2 weeks ago

Thanks for confirming @daturkel . After a lot of experiments, I've finally been able to create a minimum reproducible test on this PR: https://github.com/apache/iceberg-python/pull/1141

I will use this test to investigate issue more, and hopefully find a fix for this issue soon!

sungwy commented 2 weeks ago

Hi @daturkel and @dev-goyal I was finally able to find the root cause and put up a fix for this issue on this PR: https://github.com/apache/iceberg-python/pull/1141. Would you be able to install from the branch and confirm if it fixes your issue?

Sung

dev-goyal commented 2 weeks ago

Hi @sungwy absolutely - give me a couple days please, but I will prioritize testing this ASAP.

Thank you so much for prioritizing the fix, we much appreciate it!