trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.36k stars 2.98k forks source link

Hive cannot read ORC ACID table updated by Trino twice #8268

Closed findepi closed 3 years ago

findepi commented 3 years ago
0: jdbc:hive2://localhost:10000/default> SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2;
INFO  : Compiling command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520): SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:test_test_update_subquery_false_none_nw4lj1py3ix2.column1, type:int, comment:null), FieldSchema(name:test_test_update_subquery_false_none_nw4lj1py3ix2.column2, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520); Time taken: 0.139 seconds
INFO  : Executing command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520): SELECT * FROM test_test_update_subquery_false_NONE_nw4lj1py3ix2
INFO  : Completed executing command(queryId=hive_20210612021658_c6a18d73-d787-4e75-b53e-698310076520); Time taken: 0.001 seconds
INFO  : OK

Error: java.io.IOException: java.io.IOException: Two readers for {originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}: new [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/test_test_update_subquery_false_none_nw4lj1py3ix2/delete_delta_0000004_0000004_0002/bucket_00000, 9223372036854775807)], old [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/test_test_update_subquery_false_none_nw4lj1py3ix2/delete_delta_0000004_0000004_0000/bucket_00000, 9223372036854775807)] (state=,code=0)

repro steps in https://github.com/trinodb/trino/pull/8267 as a TODO full repro steps in a comment below https://github.com/trinodb/trino/issues/8268#issuecomment-863817129

Praveen2112 commented 3 years ago

Since we update on the same key multiple times it throws this exception. If we do a major compaction post each update. Things should work as expected. Will test and raise a PR.

findepi commented 3 years ago

If we do a major compaction post each update. Things should work as expected.

Trino cannot do a major compaction.

Table should be readable also if a compaction doesn't happen. Even if we were able to do a compaction, it would not be feasible to require a compaction after every update or every other update. And it wouldn't be correct from concurrency perspective either.

Praveen2112 commented 3 years ago

But for tests maybe we could trigger the compaction from Hive after assertion. Or we would need to apply update on a different set of columns.

findepi commented 3 years ago

I would see the bug fixed rather than worked around in tests. Assuming it is indeed a bug.

@djsstarburst do you happen to recognize this?

Praveen2112 commented 3 years ago

But the bug is in Hive side ? https://issues.apache.org/jira/browse/HIVE-22318

It looks like the delete delta are of same size and it would create similar record identifier. (as quoted in the JIRA)

Praveen2112 commented 3 years ago

Other workaround is to ensure the update is applied on different set of rows instead of applying on same set of rows.

Praveen2112 commented 3 years ago

Corresponding JIRA in Hive : https://issues.apache.org/jira/browse/HIVE-22318

findepi commented 3 years ago

But the bug is in Hive side ? https://issues.apache.org/jira/browse/HIVE-22318

the jira talks about Hive's MERGE statement. So the bug can be in Hive's ORC reader, or Hive ORC writer, or Hive MERGE statement implementation.

Can we assume at this point there is no bug on the Trino side?

Praveen2112 commented 3 years ago

the jira talks about Hive's MERGE statement.

This is seen both during merge statement and when selecting from that Table. From the exception and its source it looks like the issue is during Reading the ORC file (with a bunch of delete delta). Ref : https://github.com/apache/hive/blob/d0bbe76ad626244802d062b0a93a9f1cd4fc5f20/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java#L1225

Can we assume at this point there is no bug on the Trino side?

Yes !! We are able to read the data and its the updated one so its not a bug in Trino side.

findepi commented 3 years ago

Full repro steps:

bin/ptl env up --environment singlenode --config config-hdp3
# client/trino-cli/target/trino-cli-*-executable.jar --debug --server localhost:8080 --catalog hive --schema default
trino:default> CREATE TABLE region AS TABLE tpch.tiny.region;
CREATE TABLE: 5 rows

trino:default> CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);
CREATE TABLE
trino:default> INSERT INTO t VALUES (1, 'x');
INSERT: 1 row

trino:default> INSERT INTO t VALUES (2, 'y');
INSERT: 1 row

trino:default> UPDATE t SET column2 = (SELECT max(name) FROM region); -- BTW the problem is reproducible also when using SET column2 = 'MIDDLE EAST' here
UPDATE: 2 rows

trino:default> UPDATE t SET column2 = (SELECT min(name) FROM region); -- BTW the problem is reproducible also when using SET column2 = 'AFRICA' here
UPDATE: 2 rows

trino:default> SELECT * FROM t;
            ->
 column1 | column2
---------+---------
       2 | AFRICA
       1 | AFRICA
(2 rows)

now in Hive:

$ docker exec -itu hive ptl-hadoop-master bash -l
[hive@hadoop-master /]$ beeline -n hive

0: jdbc:hive2://localhost:10000/default> SELECT * FROM t;

Error: java.io.IOException: java.io.IOException: Two readers for {originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}: new [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/t/delete_delta_0000004_0000004_0002/bucket_00000, 9223372036854775807)], old [key={originalWriteId: 3, bucket: 536870912(1.0.0), row: 0, currentWriteId 4}, nextRecord={2, 3, 536870912, 0, 4, null}, reader=Hive ORC Reader(hdfs://hadoop-master:9000/user/hive/warehouse/t/delete_delta_0000004_0000004_0001/bucket_00000, 9223372036854775807)] (state=,code=0)

however if i recreate the table in Trino

DROP TABLE t;
CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);

and then run INSERTs and UPDATEs in Hive then the SELECT * FROM t does not fail in Hive (and it does not fail in Trino either)

INSERT INTO t VALUES (1, 'x');
INSERT INTO t VALUES (2, 'y');
UPDATE t SET column2 = 'MIDDLE EAST'; -- not using subquery here, because Hive doesn't support that and this must not matter
UPDATE t SET column2 = 'AFRICA'; -- as above
SELECT * FROM t;

+------------+------------+
| t.column1  | t.column2  |
+------------+------------+
| 1          | AFRICA     |
| 2          | AFRICA     |
+------------+------------+

or, if i recreate and populate the table in Trino

DROP TABLE t;
CREATE TABLE t (column1 int, column2 varchar) WITH (transactional = true);
INSERT INTO t VALUES (1, 'x');
INSERT INTO t VALUES (2, 'y');

and then run UPDATEs in Hive then the SELECT * FROM t does not fail in Hive (and it does not fail in Trino either)

UPDATE t SET column2 = 'MIDDLE EAST'; -- as above
UPDATE t SET column2 = 'AFRICA'; -- as above
SELECT * FROM t;

+------------+------------+
| t.column1  | t.column2  |
+------------+------------+
| 1          | AFRICA     |
| 2          | AFRICA     |
+------------+------------+

To be the above is quite convincing it's a problem in how Trino UPDATE creates delta files. It creates them in way that can be read by Trino, but cannot be read by Hive. And it's not an inherent problem with the Hive reader. I am not saying Hive reader is bug-free, but Hive is the reference implementation of Hive, so Trino should produce ORC ACID files readable by Hive if possible. And it clearly is possible in this case.

Praveen2112 commented 3 years ago

To be the above is quite convincing it's a problem in how Trino UPDATE creates delta files.

When we run a query like this on fresh table

INSERT INTO t VALUES (1, 'x');

Trino inserts the data into the following directory

/user/hive/warehouse/t/delta_0000001_0000001_0000

And when we insert another row in it

INSERT INTO t VALUES (2, 'y');

Trino inserts the data into the following directory

/user/hive/warehouse/t/delta_0000002_0000002_0000

So now when we run an update like this

UPDATE t SET column2 = 'MIDDLE EAST';

Trino creates a delta directories for each of the directory (delta_0000001_0000001_0000, delta_0000002_0000002_0000) for deletes and inserts unlike Hive which creates a directory per transaction and now the deleted rows are uniquely mapped to each file as each deleted row information has the same rowId but different transactionId - so now hive could use this to delete corresponding row in any of the base or delta file (so does Trino)

now when we run an update like this

UPDATE t SET column2 = 'INDIA';

Trino creates two more directories for the new delta ( referring to delta_0000001_0000001_0000, delta_0000002_0000002_0000) but now the deleted rows information have same rowId and the transactionId. When hive reads the delete_delta directory it has two files having same delete row information and it throws that TwoReader exception.Additional hive doesn't know how to map this delete information to which of the file (while in Trino it knows the mapping details so it works properly).

One solution is introduce a different bucket number for each of the delta directories created so that similar rowIds could be mapped to a different bucket.

Please correct me if I am wrong.

djsstarburst commented 3 years ago

I'm surprised that Hive can't read files with the same bucket but different statementIds. Confirming what you found, I used the orc-tools to decode the data files after the two inserts and two updates in the test cased by @findepi. The results are below.

I guess it's obvious - - and I just tested it - - that if the two rows were inserted in a single insert transaction the test passes, because there is only one split in the bucket.

To avoid producing files with different statementIds, I think Trino UPDATE would have to add an ExchangeNode layer to flow all the splits belonging to a single bucket into one node and one file. @electrum, your thoughts?

./delta_0000001_0000001_0000/bucket_00000
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"column1":1,"column2":"x"}}
________________________________________________________________________________________________________________________

./delta_0000002_0000002_0000/bucket_00000
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"column1":2,"column2":"y"}}
________________________________________________________________________________________________________________________

./delete_delta_0000003_0000003_0000/bucket_00000
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}
________________________________________________________________________________________________________________________

./delete_delta_0000003_0000003_0001/bucket_00000
{"operation":2,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}
________________________________________________________________________________________________________________________

./delta_0000003_0000003_0001/bucket_00000
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":{"column1":2,"column2":"MIDDLE EAST"}}
________________________________________________________________________________________________________________________

./delta_0000003_0000003_0000/bucket_00000
{"operation":0,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":{"column1":1,"column2":"MIDDLE EAST"}}
________________________________________________________________________________________________________________________

./delete_delta_0000004_0000004_0000/bucket_00000
{"operation":2,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":null}
________________________________________________________________________________________________________________________

./delta_0000004_0000004_0000/bucket_00000
{"operation":0,"originalTransaction":4,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":{"column1":2,"column2":"AFRICA"}}
________________________________________________________________________________________________________________________

./delete_delta_0000004_0000004_0002/bucket_00000
{"operation":2,"originalTransaction":3,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":null}
________________________________________________________________________________________________________________________

./delta_0000004_0000004_0002/bucket_00000
{"operation":0,"originalTransaction":4,"bucket":536870912,"rowId":0,"currentTransaction":4,"row":{"column1":1,"column2":"AFRICA"}}
________________________________________________________________________________________________________________________
electrum commented 3 years ago

This is the source of the Hive error message: https://github.com/apache/hive/blob/rel/release-3.1.2/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java#L1169

electrum commented 3 years ago

The equality for ReaderKey is the tuple (originalWriteId, bucket, rowId, currentWriteId). As @Praveen2112 noted, we end up with multiple rows for the same (writeId, bucket, rowId) which is illegal, because they are the same row. We can't change bucket because that is based on the declared bucketing column(s).

Assuming this is the issue, then we need to ensure unique row IDs across the writers. I can think of two ways to do this:

electrum commented 3 years ago

It looks like the current row ID generation has a bug where it gets reset for every page (which is not the cause of this issue but needs to be fixed regardless): https://github.com/trinodb/trino/blob/2734d84245d9fb4cf8de37a675c6938557c4b47c/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java#L307-L314

electrum commented 3 years ago

Note that long term we could switch to the first strategy of single writer per bucket, after merge lands and we change the implementation of update/delete to use the merge connector APIs, which support redistribution.