prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.75k stars 5.28k forks source link

[Native] CTE support in Prestissimo #22630

Open aditi-pandit opened 2 months ago

aditi-pandit commented 2 months ago

Expected Behavior or Use Case

Presto java supports CTE (WITH clause) with materialization. https://prestodb.io/docs/0.286/admin/properties.html#cte-materialization-properties

Investigate their usage in Prestissimo

Presto Component, Service, or Connector

Presto native

Possible Implementation

Example Screenshots (if appropriate):

Context

aditi-pandit commented 2 months ago

Notes : Presto CTE design is based on https://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf

The main PR Is https://github.com/prestodb/presto/pull/20887/

The core of the logic to wire CT Producers and Consumers and Sequence nodes for CTEs is in the logical optimizer. This is all translated to Temp tables writes and reads in the physical planning.

TODO : Cover the gaps post physical planning.

aditi-pandit commented 2 months ago

The first issue I have encountered is that CTE generates bucketed (but not partitioned) TEMP tables as supported by HMS. These are not supported in Prestissimo

presto:tpch> WITH temp as (SELECT orderkey FROM ORDERS) SELECT * FROM temp t1;

Query 20240501_001323_00026_5bknt failed: hiveInsertTableHandle->bucketProperty == nullptr || isPartitioned Bucketed table must be partitioned: {"@type":"hive","actualStorageFormat":"ORC","bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"compressionCodec":"SNAPPY","inputColumns":[{"@type":"hive","columnType":"REGULAR","hiveColumnIndex":0,"hiveType":"bigint","name":"_c0_orderkey","requiredSubfields":[],"typeSignature":"bigint"}],"locationHandle":{"tableType":"TEMPORARY","targetPath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","writeMode":"DIRECT_TO_TARGET_NEW_DIRECTORY","writePath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"pageSinkMetadata":{"schemaTableName":{"schema":"__temporary_tables__","table":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"table":{"dataColumns":[{"name":"_c0_orderkey","type":"bigint"}],"databaseName":"__temporary_tables__","owner":"user","parameters":{},"partitionColumns":[],"storage":{"bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"location":"","parameters":{},"serdeParameters":{},"skewed":false,"storageFormat":{"inputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat","serDe":"org.apache.hadoop.hive.ql.io.orc.OrcSerde"}},"tableName":"__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableType":"TEMPORARY_TABLE"}},"partitionStorageFormat":"ORC","preferredOrderingColumns":[],"schemaName":"__temporary_tables__","tableName":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableStorageFormat":"ORC"}

VeloxUserError: hiveInsertTableHandle->bucketProperty == nullptr || isPartitioned Bucketed table must be partitioned: {"@type":"hive","actualStorageFormat":"ORC","bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"compressionCodec":"SNAPPY","inputColumns":[{"@type":"hive","columnType":"REGULAR","hiveColumnIndex":0,"hiveType":"bigint","name":"_c0_orderkey","requiredSubfields":[],"typeSignature":"bigint"}],"locationHandle":{"tableType":"TEMPORARY","targetPath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","writeMode":"DIRECT_TO_TARGET_NEW_DIRECTORY","writePath":"file:/Users/aditipandit/ahana_dev/data/PARQUET/hive_data/__temporary_tables__/__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"pageSinkMetadata":{"schemaTableName":{"schema":"__temporary_tables__","table":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586"},"table":{"dataColumns":[{"name":"_c0_orderkey","type":"bigint"}],"databaseName":"__temporary_tables__","owner":"user","parameters":{},"partitionColumns":[],"storage":{"bucketProperty":{"bucketCount":100,"bucketFunctionType":"PRESTO_NATIVE","bucketedBy":["_c0_orderkey"],"sortedBy":[],"types":["bigint"]},"location":"","parameters":{},"serdeParameters":{},"skewed":false,"storageFormat":{"inputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat","serDe":"org.apache.hadoop.hive.ql.io.orc.OrcSerde"}},"tableName":"__presto_temporary_table_ORC_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableType":"TEMPORARY_TABLE"}},"partitionStorageFormat":"ORC","preferredOrderingColumns":[],"schemaName":"__temporary_tables__","tableName":"__presto_temporary_table_orc_20240501_001323_00026_5bknt_84eb3155_160a_49e4_892f_ada767d32586","tableStorageFormat":"ORC"}

Will review this support in Presto Native.

aditi-pandit commented 1 month ago

Identified 3 sub-parts of coding for this feature: i) Add support for bucketed (but not partitioned) tables. This support will enable CTAS to such tables. Although subsequent Insertions are not allowed. This is consistent with Presto behavior. In progress https://github.com/facebookincubator/velox/pull/9740 and https://github.com/prestodb/presto/pull/22737 ii) Velox/Prestissimo are not aware of Temporary tables right now. Add this option and support as consistent for Presto. At his point CTE for DWRF/Parquet will work. https://github.com/facebookincubator/velox/pull/9844 and https://github.com/prestodb/presto/pull/22780. iii) Add an optimized format for materialization.

tdcmeehan commented 1 month ago

iii) Add an optimized format for materialization.

I believe in Java we use the PRESTO_PAGE format. I wonder if it's cheaper to use Arrow for Prestissimo workers?

aditi-pandit commented 1 month ago

iii) Add an optimized format for materialization.

I believe in Java we use the PRESTO_PAGE format. I wonder if it's cheaper to use Arrow for Prestissimo workers?

@tdcmeehan : Yes, Java uses PRESTO_PAGE format. For Velox, Arrow or just the format used for spilling should be efficient. I'll prototype the speed-ups seen with both.

aditi-pandit commented 1 month ago

@jaystarshot : https://github.com/facebookincubator/velox/pull/9844 and https://github.com/prestodb/presto/pull/22780

In the Presto PR I derived a Native test from TestCteExecution.java so that all your tests are run on the Native side. 31 of the tests passed but 18 failed. I'm looking at the failures in more detail. But would be great if you took a look as well.

You have to apply the Velox PR changes in your Velox submodule with the Presto PR to get a working setup.

jaystarshot commented 1 month ago

Even if we use arrow, after the reads and before the writes we still might need to convert it to presto page format when we would exchange the table scan stage. I believe the the spilling format should be the same as Presto page. link I found this serializer in code which https://github.com/facebookincubator/velox/blob/main/velox/serializers/PrestoSerializer.cpp#L49 seems to serialize in presto Page format.

aditi-pandit commented 1 month ago

@jaystarshot : Spilling uses PrestoSerializer. I'll create a PR that add a PrestoPageWriter that we can wire into this code.

jaystarshot commented 1 month ago

Thats awesome! you will need the reader as well. I was trying to look into this as a velox-beginner task for myself but I will leave it to the experts!

aditi-pandit commented 3 weeks ago

Update : The PRs for CTAS for Bucketed but not partitioned tables are merged in OSS now.

I am debugging the failures in https://github.com/facebookincubator/velox/pull/9844 and https://github.com/prestodb/presto/pull/22780 now.

@jaystarshot and I will re-connect about Pagefile reader/writer after debugging the failures above.