Open jaystarshot opened 1 year ago
Will start looking
This is also something we have been exploring. In order to do this right, we need a mechanism to materialize intermediate results. But short of that, this paper describes a holistic framework for identifying and reusing CTE's with streaming/staged in-memory execution https://www.amazon.science/publications/computation-reuse-via-fusion-in-amazon-athena @jaystarshot - If you folks are interested, we can collaborate on this cc: @bmckennaah, @aaneja
For part 1, yes I think we need to store the results in a temp table and create a new exchange which will stream it from the temp table to the repeated parts of the plan. Thanks for the reference, will go through. Yes we are very interested and would love to collaborate !
CC: @feilong-liu
This is the architecture that I was thinking about. Basically create a table definition in planning and read and write to that table while managing state in execution.
Original Plan
In planning, there will be a new CTE optimizer after exchange optimizer which will detect and choose CTEs by hashing all plans beginning from exchange and maybe using CBO. this optimizer will transform the plan to new Bridge writers and reader operators which will be based off TableWriter and TableScanOperator respectively.
New Plan
Table Deletion will need to be done from co-ordinator after the query lifecyle.
Parts which are challenging
Maybe some examples? E.g. TPCH Q15 is a simple CSE (Common SubExpression) case.
How are CSEs detected? In the physical plan? Or in the logical algebra?
What would the algorithm
detect as CSEs for this query (where column c
explain select * from (select b1, c1 from t1 join (select a2, a3 from t2 join t3 on b2=b3) dt3 on a1=a2) dt1, (select c1, d1 from t1 join (select a2, a3 from t3 join t2 on b3=b2) dt4 on a1=a2) dt2;
?? dt3 and dt4 are logically identicaI. dt1 and dt2 can be computed from the same CSE if their SELECT lists are UNIONed.
I think the decision has to be cost based (i.e. a cost-based rewrite like pushing joins into unions).
On Mon, Jun 5, 2023 at 5:03 PM Jay Narale @.***> wrote:
[image: image] https://user-images.githubusercontent.com/19339828/243509619-d41549cb-925b-44fd-8bf0-8612ecf8e6ca.png This is the architecture that I was thinking about. Basically create a table definition in planning and read and write to that table while managing state in execution.
In planning, there will be a new CTE optimizer which will detect and choose CTEs by hashing all plans beginning from exchange and maybe using CBO. this optimizer will transform the plan to new Bridge writers and reader operators which will be based off TableWriter and TableScanOperator respectively.
[image: image] https://user-images.githubusercontent.com/19339828/243510121-14162120-af27-4dc4-9286-3c4ea0fedae0.png
Table Deletion will need to be done from co-ordinator after the query lifecyle.
One of the part which are challenging
- Since Insertion will be handled in execution and we are only creating the table metadata during optimization, the part where we assign which drivers to read respective split (maybe in scheduler) is unclear to me.
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1577707813, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2APKUHVYFNMOB4P3X6MMVTXJZXVFANCNFSM6AAAAAAYQRZNMQ . You are receiving this because you were mentioned.Message ID: @.***>
@bmckennaah the physical plan hashing will be tailored to canonicalize the subtrees to detect CTE's accurately hence dt3 and dt4 would be detected. The decision has to be cost based ofcourse. Initially however, I am focusing on a complete end to end flow and deciding using non cbo.
I think you would rather detect
t1 join (select a2, a3 from t2 join t3 on b2=b3) on a1=a2
as the CSE than dt3 and dt4 (since dt3 and dt4 are contained within this larger CSE). Even better is to use
select b1, c1, d1 from t1 join (select a2, a3 from t2 join t3 on b2=b3) on a1=a2
as your CSE. My point here is that in general CSEs referenced in different places may have different project lists, e.g.
explain with cte1 as (select a1, b1, c1 from t1 join t2 on a1=a2) select * from cte1 where b1 not in (select a1 from cte1);
Here the build and probe sides of the semijoin will have different projection lists (in the physical plan). Even the 2 physical instances
of t1 will project different sets of columns.
On Tue, Jun 6, 2023 at 10:30 AM Jay Narale @.***> wrote:
@bmckennaah https://github.com/bmckennaah the physical plan hashing will be tailored to canonicalize the subtrees to detect CTE's accurately hence dt3 and dt4 would be detected. The decision has to be cost based ofcourse. Initially however, I am focusing on a complete end to end flow and deciding using non cbo.
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1579181874, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2APKUGOTCR3CB3222EEOS3XJ5SKBANCNFSM6AAAAAAYQRZNMQ . You are receiving this because you were mentioned.Message ID: @.***>
So we would need to store b1, c1, and d1 as the common temporary table. I think maybe the hashing can be tailored later to process column-wise and then a project on the common CTE while reading. i.e. if we focus on the same subtree first it can be extended for this use case.
Thanks for looking at this. This is a feature I've wanted for a while but haven't had a chance to prioritize. http://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf is another related paper that @mlyublena shared with me. Also, for the temporary table, you can reuse the materialized exchange concept.
Yes you need to union the SELECT lists in some cases. And in the physical plan CTE/CSEs may no longer be present, e.g. if t3 is small and t1 and t2 are large in this query:
explain with cte1 as (select a1, b1, c1 from t1 join t2 on a1=a2) select * from cte1, t3 where a1=a3 and b1 not in (select a1 from cte1)
t1 JOIN t2 may only be present once in the physical plan (since joining t1-t3 in the outer block is cheaper) because of view folding/composition/merging
and join ordering. So it seems CTE/CSE analysis needs to happen on the logical expression, not the physical plan.
If you use temp tables for CSEs you can create them, analyze them (collect stats on write), replace the CSEs with the temp tables in the original query, and invoke the planner. You'll get much
better estimates in the CBO this way (and hence better plans).
On Tue, Jun 6, 2023 at 11:29 AM Jay Narale @.***> wrote:
So we would need to store b1, c1, and d1 as the common temporary table. I think maybe the hashing can be tailored later to process column-wise and then a project on the common CTE while reading. i.e. if we focus on the same subtree first it can be extended for this use case.
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1579259309, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2APKUGAAXUNOH2NMVFWBJLXJ5ZI5ANCNFSM6AAAAAAYQRZNMQ . You are receiving this because you were mentioned.Message ID: @.***>
Look at: ExchangeMaterializationStrategy when it is set to ALL, I think we already materialize exchanges to hive. Maybe you can piggyback on that framework for this work.
Wow thanks! I will take a look
Thanks everyone for all the pointers! I reused the materialized exchange framework and could create a prototype for detection and replacement for a simple Union query.
SELECT column FROM table UNION SELECT column FROM table
for a hive table.
Without CTE
With CTE
I used naiive detection logic based on hash of a plan (did not use CBO) in a new optimizer, basically added a new exchange type and used the basePlanFragmenter to create and store Temporary tables. Similar to here.
This basically uses the metadata.createTemporaryTable
which creates session lived temporary tables. But this functionality is experimental currently so that would be our dependency to make this production ready.
There are a lot of sanity tests and a lot of work needed to make this production ready but the prototype shows that this can be done
This is cool! Once this framework is setup, it could be interesting to make CTE melding more first-class like merging filters/projections, based on the filter cardinalities and matching output partitioning etc.
Very promising result, Jay!
On Fri, Jun 9, 2023 at 6:11 PM Sreeni Viswanadha @.***> wrote:
This is cool! Once this framework is setup, it could be interesting to make CTE melding more first-class like merging filters/projectsions, based on the filer cardinalities and matching output partitioning etc.
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1585335695, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2APKUGROCEYGOFLLKAM3WTXKPCSXANCNFSM6AAAAAAYQRZNMQ . You are receiving this because you were mentioned.Message ID: @.***>
Thank you! I will begin work on breaking down components, creating PR's for them and making this production ready, along with a more detailed analysis of performance of temporary tables, any risks etc
And a design doc to start now
Just a suggestion : write your test cases (positive and negative) first so folks understand what the scope is.
On Mon, Jun 12, 2023 at 3:15 PM Jay Narale @.***> wrote:
And a design doc to start now
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1588181869, or unsubscribe https://github.com/notifications/unsubscribe-auth/A2APKUDRKS5RDC53WTRRGLDXK6IGXANCNFSM6AAAAAAYQRZNMQ . You are receiving this because you were mentioned.Message ID: @.***>
Maybe I should open a separate issue but a Tee operator could be quite useful in Presto in general (also in this specific case). That could help quite a bit in things like window functions as well as this CTE/subplan reuse case.
I've put together a proposal that's been inspired and based on the paper http://www.vldb.org/pvldb/vol8/p1704-elhelw.pdf, which was recommended by @rschlussel. I'm hoping we could discuss it and improve the idea further.
https://docs.google.com/document/d/10J9_j08imqe6xEH9iepteG0DA5B-IpmYv4lSRvCgdJA/edit.
@kaikalur @rschlussel @ClarenceThreepwood @bmckennaah what do you think about this approach? - link
@jaystarshot wanted to check if there was any update here
@rschlussel Unfortunately nope, kind of got sidetracked by other tasks due to the estimated work and ROI for this one but we plan to dedicate some resources this half. Interested in reprioritizing if we can dedicate more resources using the community.
We are starting POC development based on the design and will have something this quarter
've prepared the POC - accessible here: link. Currently only 1 persistent cte is supported, i am working on supporting multiple + dependent.
Initially, we executed tpcds query 4 and observed a 30% CPU reduction on sf100. On sf1000, Q4, which previously failed, successfully finished within 5 minutes on our test cluster using the PAGEFILE storage format for the temporary table.
Additionally, we mimicked production traffic by arbitrarily converting one Common Table Expression (CTE) to be persistent. Unfortunately, 20-40% of the queries encountered an identical internal error, stating 'Malformed PageFile format, footer length is missing. '
However, these errors were not reproducible upon subsequent runs. The remaining 60-80% of the queries executed successfully."
We don't see these errors with parquet format for temp tables. The speed is around 5x slower though. 10% queries failed with hive bad data. ( ..__presto_temporary_table_PARQUET_20231010_035817_00498_9ghcq_e13e9b33_cdbd_49f5_a6ff_fcf613f2f4b6/000000_0_20231010_035817_00498_9ghcq is not a valid Parquet File", "cause": { "type": "com.facebook.presto.parquet.ParquetCorruptionException", )
'Malformed PageFile format, footer length is missing. '
This error message suggested it is not a valid PAGEFILE format, is it possible that this file is empty, not exit, or it is not a PAGEFILE format (less likely, if so , error will throw later)?
I debugged with deletion off, The pagefile file was non empty. It happens with parquet too. ..__presto_temporary_table_PARQUET_20231010_035817_00498_9ghcq_e13e9b33_cdbd_49f5_a6ff_fcf613f2f4b6/000000_0_20231010_035817_00498_9ghcq is not a valid Parquet File", "cause": { "type": "com.facebook.presto.parquet.ParquetCorruptionException", ) I inspected the parquet file on HDFS and it was a valid parquet file! Moreover the read was just happening 200ms after the write
So my suspicion was now on 2 things. 1) HDFS inconsistency 2) Some grouped execution changes not being picked up in my patch I introduced a 1-second pause after the child sections finish and before the parent section begins, and voilà! The corruption errors disappeared! Wondering where and what the proper fix would be
Finally found the issue, we had a legacy configuration where read from observer namenode was enabled. I think OBNN needs some time to sync with active NN, Turning that off works. Proceeding to fix some other issues and productionize the PR
Where are we on this. Even we are waiting for this PR to get merged.
A part of the work ( materialize all CTEs) has already been merged.
On Tue, Dec 26, 2023 at 9:14 PM sutodi @.***> wrote:
Where are we on this. Even we are waiting for this PR to get merged.
— Reply to this email directly, view it on GitHub https://github.com/prestodb/presto/issues/19744#issuecomment-1869955503, or unsubscribe https://github.com/notifications/unsubscribe-auth/AETRUNAPF24MHGA6XWC4CADYLOVD3AVCNFSM6AAAAAAYQRZNMSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRZHE2TKNJQGM . You are receiving this because you were mentioned.Message ID: @.***>
-- Warm Regards,
Jay Narale
Thank you @jaystarshot ! BTW, there is open source YQL query, and this feature is implemented there. It is different from Presto and sitting on top of YDB, but anyway maybe it is possible to get some insights from their repository. https://ydb.tech/docs/en/yql/reference/ https://github.com/ydb-platform/ydb/tree/main/ydb/library/yql
Did you mean to refere something? Because these links just point to the general page
Ref - https://github.com/trinodb/trino/issues/5878