trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.5k stars 3.02k forks source link

[BUG][Delta Lake] Trino returns duplicate rows for Delta Tables with Deletion Vectors #22972

Open whitleykeith opened 3 months ago

whitleykeith commented 3 months ago

Environment

Trino 451 / Delta Lake Connector

Delta tables generated via Spark 3.5.1 / Delta 3.1.0

Delta Tables have Deletion Vectors enabled + are used in MERGE statements for updated and deleted rows

Description

For context, we have a collection of Spark jobs that incrementally snapshot JDBC sources (i.e. MySQL) and update a Delta Table via a MERGE statement roughly once a day. All of our MySQL tables have an auto-incrementing primary key id that we use as our merge key.

We use deletion vectors for all of these tables for performance reasons, and have noticed a bug we believe exists within Trino.

In some of our tables, we can find "duplicates" of a given id in a delta table, even if counts line up exactly. Moreover, this only happens when trying to filter off the merge key and not with any other filters.

For example, say we started snapshotting a MySQL table on 07/29 with it's Delta table having 100 rows and a contiguous block of ids 0-99. We can validate this via our Spark runtime that this table indeed has no duplicates. Running something like select count(*), count(distinct id) from table in Trino will also return 100 for both, indicating no dupe ids exist in the table as expected.

However, sometimes querying a certain primary key will yield duplicate results. For example, select * from table where id = 50 would return:

     id   | filter_column  |      created_at              |        updated_at
----------+----------------+------------------------------+-----------------------------
 50       | foo            | 2022-10-12 09:07:36.000 UTC  | 2024-07-29 16:06:20.000 UTC
 50       | foo            | 2022-10-12 09:07:36.000 UTC  | 2024-08-07 00:12:56.000 UTC

In our observations, this seems to be pulling the latest version of that row, and the first version of the row ever taken. However, a similar query in Spark returns the expected 1 (up to date) row, indicating this seems to be an issue with how Trino is reading the table and not the table itself.

To make things a bit more confusing, this doesn't happen if you filter on other columns than the merge key. For instance:

select * from table where filter_column = 'foo' -> will only return the one up to date row for id = 50

select * from table where filter_column = 'foo' AND id = 50 -> will still return both rows

This also doesn't

Reproducing

It's hard to build a reproducible case, but this seems to happen to every row in our tables that:

My best attempt to reproduce (looks like Trino just added support for writing DVs, so maybe it's reproducible this way)

[!WARNING] This is a best effort at reproducing and is not indicative of how we generate our Delta Tables. Are Delta Tables are written via Spark. with a call similar to this:


targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()

Create table

create table delta.scratch.foo (id int, updated_at timestamp)

Enable DVs on the table. Not sure if it's possible to do in Trino

Insert a row in the table:

insert into delta.scratch.foo values (1, TIMESTAMP '1970-01-01')

Run a merge that would update a row:

MERGE into delta.scratch.foo target using (select 1 as id, timestamp '2024-01-01' as updated_at)  u
    on (target.id = u.id)
    when matched
    then update set updated_at = u.updated_at;

If this reproduces the error, you should see something like this:

id |         updated_at
----+----------------------------
  1 | 2024-01-01 00:00:00.000000
  1 | 1970-01-01 00:00:00.000000
ebyhr commented 2 months ago

@whitleykeith I tested the following statements on both the current master and 451, but Trino returns the correct result:

-- Spark
CREATE TABLE default.test (id int, updated_at timestamp)
USING delta 
LOCATION 's3://test-bucket/test'
TBLPROPERTIES ('delta.enableDeletionVectors' = true);

INSERT INTO default.test VALUES (1, TIMESTAMP '1970-01-01');

MERGE INTO default.test target USING (SELECT 1 as id, timestamp '2024-01-01' as updated_at) u
ON (target.id = u.id)
WHEN matched
THEN update set updated_at = u.updated_at;

SELECT * FROM default.test;
1   2024-01-01 00:00:00

-- Trino
SELECT * FROM delta.default.test;
 id |         updated_at
----+-----------------------------
  1 | 2024-01-01 00:00:00.000 UTC
ram-agg commented 2 months ago

@ebyhr we are facing the same issue We have a delta table with deletionVectors enabled, and we also use Merge while writing data in it and when querying that table via trino, it is giving some duplicate records and the same query when run on databricks is giving the correct results.

Note: It is not giving duplicates for all the records, but yeah for some cases it is giving.

findinpath commented 2 months ago

@ram-agg do you happen to have a simple scenario at hand to reproduce the issue ?

findinpath commented 2 months ago

@whitleykeith i'm missing from your reproduction scenario whether all the modification operations (including MERGE) are performed with Spark/Databricks or are there some of these operations performed via Trino.

Can you maybe share the contents of the problem table for investigation?

findinpath commented 2 months ago

However, sometimes querying a certain primary key will yield duplicate results.

@whitleykeith pls add the table definition to the bug report

There is no such concept of primary key in Delta Lake

whitleykeith commented 2 months ago

I'm missing from your reproduction scenario whether all the modification operations (including MERGE) are performed with Spark/Databricks

All modification operations (MERGE, enabling DVs, optimize, etc.) are all done via our OSS Spark 3.5.1 / Delta 3.1 runtime. In this scenario Trino is read-only.

There is no such concept of primary key in Delta Lake

Yeah, apologies what I really mean is that all of our tables have a column that we exclusively use in our MERGE condition, which effectively requires that column to be unique or else MERGEs will fail.

Can you maybe share the contents of the problem table for investigation?

I don't think it'll be possible for me to share the exact tables this affects, but I can tell you it:


INSERT INTO delta.scratch.foo_history 
SELECT * from delta.scratch.foo

In this case, the duplicates are not present. I'll see if I can build a reproducible case

findinpath commented 2 months ago

@whitleykeith pls look into doing

SELECT id int, updated_at timestamp, "$path" FROM delta.scratch.foo WHERE  id = ...

BTW if you look Spark _metadata.file_path can be used to showcase the path of the file.

I'm curious to see whether we're speaking about two different files.

Just to make sure, does Spark return the "correct" result?

Would you be willing to make a debugging session over Zoom ?