trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.35k stars 2.98k forks source link

validation code is not working correctly in the case of bucket evolution #6434

Open Chaho12 opened 3 years ago

Chaho12 commented 3 years ago

validation part of the code (presto-hive/src/main/java/io/prestosql/plugin/hive/HiveBucketAdapterRecordCursor.java:125) is not working correctly when changing table bucket count with existing partitions.

If the table’s bucket size was 30, but increased to 60, and partition’s bucket size is still 30, I do get an error like Query 20201222_055239_00115_fvetf failed: A row that is supposed to be in bucket 4 is encountered. Only rows in bucket 29 (modulo 30) are expected

On the other hand, if I do table bucket size 30 -> 60 -> 15, which is half the partition’s bucket size of 30, I get correct result.

Btw, if i set both table bucket size and partition bucket size as 60, i get correct result. (if both is 40 though, then I can get the query running without any error, but returns incorrect results)

Image below is when I changed only bucket size of the table while maintaining partition bucket size to 30.

edit

removed image because it contained sensitive information

findepi commented 3 years ago

If the table’s bucket size was 30, but increased to 60, and partition’s bucket size is still 30, I do get an error like Query 20201222_055239_00115_fvetf failed: A row that is supposed to be in bucket 4 is encountered. Only rows in bucket 29 (modulo 30) are expected

How did you create the files in that partition?

Can it be your table was not bucketed correctly in the first place?

findepi commented 3 years ago

if both is 40 though, then I can get the query running without any error, but returns incorrect results

Can you please retest this with Presto 348?

Chaho12 commented 3 years ago

(deleted previous comment and re-wrote comment again without sensitive info)

How did you create the files in that partition?

I used hive to create the file and here is the sql to create data. DDL

DROP TABLE IF EXISTS `db.table`;

CREATE EXTERNAL TABLE `db.table`(
  `f1` string,
  `f2` string,
  `f3` string,
  `f4` int,
  `f5` int
)
COMMENT 'ORC Table'
PARTITIONED BY (
  `date` date
)
CLUSTERED BY (
    f1, f2)
SORTED BY (
    f2, f3 ASC)
INTO 30 BUCKETS
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES (
  'orc.compress'='zlib',
  'orc.create.index'='true',
  'orc.stripe.size'='268435456'
);

SQL (I had to hide some sensitive information but here is the gist)

INSERT OVERWRITE TABLE db.table PARTITION (date = "2020-12-01")
SELECT t1.f1,
       t1.f2,
       t1.f3,
       t2.f4,
       t2.f5
FROM
  (...) 

Can it be your table was not bucketed correctly in the first place?

Well, we are using hive 3.1.2 and it works fine to create the data. Error only happens if I change the bucket number as above via command e.g. ALTER TABLEdb.tableCLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 15 BUCKETS;

Can you please retest this with Presto 348?

Of course :)

findepi commented 3 years ago

Can you please retest this with Presto 348?

Of course :)

The reason I'm asking this is because that version adds additional validation also for the case when table and partition bucket number is equal. This was trusted and not validated before, so it could be the problem is about incorrect write path, and not really bucket evolution.

Chaho12 commented 3 years ago

@findepi

I retried the test in two environments, hive 2.3.2 (non-secure) and hive 3.1.2 (secure) for Presto 348. Initially, it had 30 buckets. For non-secure env it works as expected.

presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';
 _col0
-------
    65
(1 row)

Query 20201228_083602_00008_g8hdz, FINISHED, 1 node
Splits: 107 total, 107 done (100.00%)
14.77 [300K rows, 307MB] [20.3K rows/s, 20.8MB/s]

Then i altered bucket in hive 30 -> 40

ALTER TABLE db.table CLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 40 BUCKETS
presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';
Query 20201228_083640_00009_g8hdz failed: Hive table (db.table) bucketing (columns=[f1, f2], buckets=40) is not compatible with partition (f1=2020-12-01) bucketing (columns=[f1, f2], buckets=30)

Then i altered bucket in hive 40 -> 60

ALTER TABLE db.table CLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 15 BUCKETS
presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';
 _col0
-------
    65
(1 row)

Query 20201228_083649_00010_g8hdz, FINISHED, 1 node
Splits: 197 total, 197 done (100.00%)
27.19 [300K rows, 714MB] [11K rows/s, 26.3MB/s]

However in secure with hive 3.1.2 it fails when I double bucket.

presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';
 _col0
-------
    65
(1 row)

Then i altered bucket in hive 30 -> 40

ALTER TABLE db.table CLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 40 BUCKETS
presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';

Query 20201229_070653_00009_a4krs failed: Hive table (db.table) bucketing (columns=[f1, f2], buckets=40) is not compatible with partition (log_date=2020-12-01) bucketing (columns=[f1, f2], buckets=30)

Then i altered bucket in hive 40 -> 15

ALTER TABLE db.table CLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 15 BUCKETS
presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';
 _col0
-------
    65
(1 row)

Then i altered bucket in hive 15 -> 60

ALTER TABLE db.table CLUSTERED BY (f1, f2) SORTED BY (f1, f2 ASC) INTO 60 BUCKETS
presto:test> select count(*) from db.table where f1 = date ('2020-12-01') and f2 ='youtube';

Query 20201229_070625_00007_a4krs failed: A row that is supposed to be in bucket 36 is encountered. Only rows in bucket 20 (modulo 30) are expected
findepi commented 3 years ago

I retried the test in two environments, hive 2.3.2 (non-secure) and hive 3.1.2 (secure) for Presto 348. Initially, it had 30 buckets.

I do not know what "secure" means here, but I'd expect the biggest difference to be the Hive version. Hive 3 introduces bucketing_version property and uses a different bucketing function by default (this is supported by Trino in general).

Your queries always use f1 = date ('2020-12-01') and f2 ='youtube' condition but it may be there is an incorrectly bucketed row with different value. Can you please make sure all the rows are correctly bucketed by Hive?

For this

  1. create a table in Hive
  2. use Trino 348 (FKA Presto 348), and run SELECT count(*) FROM table GROUP BY f1, f2 (group by all bucketing column)
  3. alter the table 30 -> 60 buckets
  4. run SELECT count(*) FROM table GROUP BY f1, f2 (group by all bucketing column) again

Another possible explanation is that a wrong bucketing function is being used after alter table.

Chaho12 commented 3 years ago

Sure will do. btw what i meant by secure was kerberos authentication when accessing hive metastore/hdfs.

-- presto
presto:test> select count(*) from test2 GROUP BY f1, f2;
 _col0 
-------
     1 
     1 
     1 
     1 
     1 
     1 
     1 
     1 
     1 
     1 
     1 
     2 
     1 
     1 
     1 
(query aborted by user)

Query 20201230_031214_00017_87dq4, RUNNING, 5 nodes
Splits: 186 total, 60 done (32.26%)
4.92 [70.7M rows, 811MB] [14.4M rows/s, 165MB/s]

Query aborted by user

-- hive
ALTER TABLE `test2` CLUSTERED BY (f1, f2) SORTED BY (f2, f3 ASC) INTO 60 BUCKETS; 

-- presto
presto:test> select count(*) from test2 GROUP BY f1, f2;

Query 20201230_031248_00018_87dq4, FAILED, 5 nodes
Splits: 276 total, 71 done (25.72%)
0.56 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20201230_031248_00018_87dq4 failed: A row that is supposed to be in bucket 46 is encountered. Only rows in bucket 9 (modulo 30) are expected
findepi commented 3 years ago

I could reproduce this with our singlenode-hdp3 environment with ~ current master (4bc096b207df0b9565a4bd5f06f8cacd152dfafe), basically following @Chaho12 's steps, slightly simplified:

Trino

create table nation as select * from tpch.tiny.nation;

Hive

CREATE TABLE nation_bucketed (
    nationkey bigint,
    name varchar(25)
)
PARTITIONED BY (dummy bigint)
CLUSTERED BY (name) INTO 30 BUCKETS;

SET hive.enforce.bucketing = true;
INSERT OVERWRITE TABLE nation_bucketed PARTITION (dummy=42)
SELECT nationkey, name FROM nation;

Trino

presto:default> SELECT * FROM nation_bucketed;
 nationkey |      name      | dummy
-----------+----------------+-------
        10 | IRAN           |    42
        17 | PERU           |    42
         1 | ARGENTINA      |    42
        24 | UNITED STATES  |    42
        16 | MOZAMBIQUE     |    42
         0 | ALGERIA        |    42
        21 | VIETNAM        |    42
        14 | KENYA          |    42
         8 | INDIA          |    42
        19 | ROMANIA        |    42
         2 | BRAZIL         |    42
        22 | RUSSIA         |    42
        20 | SAUDI ARABIA   |    42
        12 | JAPAN          |    42
         4 | EGYPT          |    42
        18 | CHINA          |    42
         7 | GERMANY        |    42
        13 | JORDAN         |    42
         6 | FRANCE         |    42
        15 | MOROCCO        |    42
         3 | CANADA         |    42
         9 | INDONESIA      |    42
        23 | UNITED KINGDOM |    42
         5 | ETHIOPIA       |    42
        11 | IRAQ           |    42
(25 rows)

Hive

ALTER TABLE nation_bucketed CLUSTERED BY (name) INTO 60 BUCKETS;

Trino

presto:default> SELECT * FROM nation_bucketed;

Query 20201230_083502_00012_wc3iz, FAILED, 1 node
http://localhost:8080/ui/query.html?20201230_083502_00012_wc3iz
Splits: 56 total, 0 done (0.00%)
CPU Time: 0.0s total,     0 rows/s,     0B/s, 7% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
Peak Memory: 0B
0.27 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20201230_083502_00012_wc3iz failed: A row that is supposed to be in bucket 17 is encountered. Only rows in bucket 7 (modulo 30) are expected
io.prestosql.spi.PrestoException: A row that is supposed to be in bucket 17 is encountered. Only rows in bucket 7 (modulo 30) are expected
    at io.prestosql.plugin.hive.HiveBucketAdapterRecordCursor.advanceNextPosition(HiveBucketAdapterRecordCursor.java:126)
    at io.prestosql.plugin.hive.util.ForwardingRecordCursor.advanceNextPosition(ForwardingRecordCursor.java:46)
    at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:88)
    at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
    at io.prestosql.operator.Driver.processInternal(Driver.java:379)
    at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
    at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
    at io.prestosql.operator.Driver.processFor(Driver.java:276)
    at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1076)
    at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.prestosql.$gen.Presto_unknown____20201230_082540_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Chaho12 commented 3 years ago

Yes, interesting thing is that this error does not happen when using hive 2. As you said, it seems to be related to bucketing version.

findepi commented 3 years ago

cc @djsstarburst