trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.32k stars 2.97k forks source link

Trino 356: Deleting partitions issue when values are referenced from subquery #8070

Closed vinay-kl closed 2 years ago

vinay-kl commented 3 years ago

Overview The partitions are not getting dropped when the delete query matches the entire partitions either with values or using in clause

trino> create table pso_test.par_and_buck_by1 (a varchar, b int, c varchar) WITH ( format = 'ORC', partitioned_by = ARRAY['c'], bucketed_by = ARRAY['a'], bucket_count = 10, transactional=true);
CREATE TABLE
trino> insert into pso_test.par_and_buck_by1 values ('a',1,'a'), ('b',2,'b');
INSERT: 2 rows
trino> select * from pso_test.par_and_buck_by1;
 a | b | c 
---+---+---
 b | 2 | b 
 a | 1 | a 
(2 rows)
trino> select * from pso_test."par_and_buck_by1$partitions";
 c 
---
 a 
 b 
(2 rows)
trino> delete from pso_test.par_and_buck_by1 where c in (select distinct c from pso_test.par_and_buck_by_ext);
Query 20210525_115847_00051_3irbv failed: Invalid descendant for DeleteNode or UpdateNode: io.trino.sql.planner.plan.ExchangeNode
java.lang.IllegalArgumentException: Invalid descendant for DeleteNode or UpdateNode: io.trino.sql.planner.plan.ExchangeNode
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.findTableScanHandle(BeginTableWrite.java:259)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.findTableScanHandle(BeginTableWrite.java:251)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.findTableScanHandle(BeginTableWrite.java:248)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.findTableScanHandle(BeginTableWrite.java:245)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.findTableScanHandle(BeginTableWrite.java:248)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.getWriterTarget(BeginTableWrite.java:181)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.getWriterTarget(BeginTableWrite.java:196)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.getWriterTarget(BeginTableWrite.java:196)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:158)
    at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:78)
    at io.trino.sql.planner.plan.TableFinishNode.accept(TableFinishNode.java:106)
    at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.rewrite(SimplePlanRewriter.java:84)
    at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.lambda$defaultRewrite$0(SimplePlanRewriter.java:73)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.defaultRewrite(SimplePlanRewriter.java:74)
    at io.trino.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:38)
    at io.trino.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:22)
    at io.trino.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
    at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:83)
    at io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith(SimplePlanRewriter.java:32)
    at io.trino.sql.planner.optimizations.BeginTableWrite.optimize(BeginTableWrite.java:75)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:208)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:197)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:192)
    at io.trino.execution.SqlQueryExecution.doPlanQuery(SqlQueryExecution.java:482)
    at io.trino.execution.SqlQueryExecution.planQuery(SqlQueryExecution.java:462)
    at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:405)
    at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:237)
    at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
    at io.trino.$gen.Trino_356____20210525_100723_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
electrum commented 3 years ago

Did this work in previous releases?

vinay-kl commented 3 years ago

Did this work in previous releases?

@electrum the first two queries from the following works in 347

presto> delete from pso_test.par_and_buck_by1 where c in (select c from pso_test.par_and_buck_by_ext);
DELETE: 2 rows

Query 20210525_181545_06265_gbymr, FINISHED, 25 nodes
https://bifrostx.myntra.com/ui/query.html?20210525_181545_06265_gbymr
Splits: 428 total, 428 done (100.00%)
CPU Time: 0.2s total,   470 rows/s, 11.1KB/s, 31% active
Per Node: 0.0 parallelism,     1 rows/s,    26B/s
Parallelism: 0.1
Peak Memory: 0B
3.83 [102 rows, 2.41KB] [26 rows/s, 646B/s]

presto> delete from pso_test.par_and_buck_by1 where c in ('a','b');
DELETE

Query 20210525_181602_06266_gbymr, FINISHED, 1 node
https://bifrostx.myntra.com/ui/query.html?20210525_181602_06266_gbymr
Splits: 1 total, 1 done (100.00%)
CPU Time: 0.0s total,     0 rows/s,     0B/s, 100% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
Peak Memory: 0B
0.75 [0 rows, 0B] [0 rows/s, 0B/s]

presto> delete from pso_test.par_and_buck_by1 where c in (values 'a','b');
Query 20210525_181612_06270_gbymr failed: Invalid descendant for DeleteNode: io.prestosql.sql.planner.plan.ExchangeNode
java.lang.IllegalArgumentException: Invalid descendant for DeleteNode: io.prestosql.sql.planner.plan.ExchangeNode
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.rewriteDeleteTableScan(BeginTableWrite.java:233)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.rewriteDeleteTableScan(BeginTableWrite.java:223)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.rewriteDeleteTableScan(BeginTableWrite.java:219)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.rewriteDeleteTableScan(BeginTableWrite.java:215)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.rewriteDeleteTableScan(BeginTableWrite.java:219)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.visitDelete(BeginTableWrite.java:113)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.visitDelete(BeginTableWrite.java:76)
    at io.prestosql.sql.planner.plan.DeleteNode.accept(DeleteNode.java:91)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.rewrite(SimplePlanRewriter.java:84)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.lambda$defaultRewrite$0(SimplePlanRewriter.java:73)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.defaultRewrite(SimplePlanRewriter.java:74)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:38)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:22)
    at io.prestosql.sql.planner.plan.PlanVisitor.visitExchange(PlanVisitor.java:189)
    at io.prestosql.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:243)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.rewrite(SimplePlanRewriter.java:84)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.lambda$defaultRewrite$0(SimplePlanRewriter.java:73)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.defaultRewrite(SimplePlanRewriter.java:74)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:38)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:22)
    at io.prestosql.sql.planner.plan.PlanVisitor.visitExchange(PlanVisitor.java:189)
    at io.prestosql.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:243)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:146)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:76)
    at io.prestosql.sql.planner.plan.TableFinishNode.accept(TableFinishNode.java:106)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.rewrite(SimplePlanRewriter.java:84)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.lambda$defaultRewrite$0(SimplePlanRewriter.java:73)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747)
    at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter$RewriteContext.defaultRewrite(SimplePlanRewriter.java:74)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:38)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:22)
    at io.prestosql.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
    at io.prestosql.sql.planner.plan.OutputNode.accept(OutputNode.java:83)
    at io.prestosql.sql.planner.plan.SimplePlanRewriter.rewriteWith(SimplePlanRewriter.java:32)
    at io.prestosql.sql.planner.optimizations.BeginTableWrite.optimize(BeginTableWrite.java:73)
    at io.prestosql.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:206)
    at io.prestosql.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:195)
    at io.prestosql.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:190)
    at io.prestosql.execution.SqlQueryExecution.doPlanQuery(SqlQueryExecution.java:449)
    at io.prestosql.execution.SqlQueryExecution.planQuery(SqlQueryExecution.java:429)
    at io.prestosql.execution.SqlQueryExecution.start(SqlQueryExecution.java:381)
    at io.prestosql.execution.SqlQueryManager.createQuery(SqlQueryManager.java:245)
    at io.prestosql.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:139)
    at io.prestosql.$gen.Presto_347____20210522_054059_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
sopel39 commented 3 years ago

cc @kasiafi @martint

findepi commented 3 years ago

cc @djsstarburst

djsstarburst commented 3 years ago

BeginTableWrite.findTableScan and BeginTableWrite.rewriteModifyTableScan have an intrinsic problem: they think that there are only a few kinds of nodes between the top-level DeleteNode or UpdateNode and the TableScanNode.

The TableScanNode that is the target of the update or delete is marked by boolean updateTarget, set only in the TableScanNode that is the target. So it's possible to replace BeginTableWrite.findTableScan and BeginTableWrite.rewriteModifyTableScan with searches that traverse any PlanNode.

I just submitted PR #8107 which does exactly that, and that change fixes this bug.

[Later]

I closed PR #8107 because @electrum pointed out that the presence of EXCHANGE nodes means that the UPDATE or DELETE plan is likely to be illegal, because those implementations require all rows to be collected on a single node.

electrum commented 3 years ago

@vinay-kl Thanks for the reproduction steps. I think I understand what is happening here -- the planner is not creating a SemiJoin -- but I'm not able to reproduce it. Can you share the schema of pso_test.par_and_buck_by_ext by doing SHOW CREATE TABLE? Also, can you show all of your session properties with SHOW SESSION? I'm wondering if you have config that is affecting the planner / optimizer.

electrum commented 3 years ago

We have handling for SemiJoin that should prevent having an exchange in the middle: https://github.com/trinodb/trino/blob/685cc4e242d77800f192ac5613aae4b15f0a3c66/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/ReplicateSemiJoinInDelete.java#L72-L73

vinay-kl commented 3 years ago

@vinay-kl Thanks for the reproduction steps. I think I understand what is happening here -- the planner is not creating a SemiJoin -- but I'm not able to reproduce it. Can you share the schema of pso_test.par_and_buck_by_ext by doing SHOW CREATE TABLE? Also, can you show all of your session properties with SHOW SESSION? I'm wondering if you have config that is affecting the planner / optimizer.

@electrum the details you asked for

trino> show create table pso_test.par_and_buck_by_ext;
                                                                 Create Table                                                                 
----------------------------------------------------------------------------------------------------------------------------------------------
 CREATE TABLE hiveqa.pso_test.par_and_buck_by_ext (                                                                                           
    a varchar,                                                                                                                                
    b integer,                                                                                                                                
    c varchar                                                                                                                                 
 )                                                                                                                                            
 WITH (                                                                                                                                       
    external_location = 'abfs://hive-qa-bifrost@gen2hivebifros.dfs.core.windows.net/hive/warehouse/external/pso_test.db/par_and_buck_by_ext', 
    format = 'TEXTFILE'                                                                                                                       
 )

trino> select * from pso_test.par_and_buck_by_ext;
 a | b | c 
---+---+---
 c | 3 | a 
 b | 2 | a 
 a | 1 | b 
 c | 3 | b 
(4 rows)                                                                                                                                            
trino> show session;
                         Name                          |        Value        |       Default       |  Type   |                                                                   Description              
-------------------------------------------------------+---------------------+---------------------+---------+--------------------------------------------------------------------------------------------
 aggregation_operator_unspill_memory_limit             | 4MB                 | 4MB                 | varchar | How much memory should be allocated per aggregation operator in unspilling process         
 collect_plan_statistics_for_all_queries               | false               | false               | boolean | Collect plan statistics for non-EXPLAIN queries                                            
 colocated_join                                        | false               | false               | boolean | Experimental: Use a colocated join when possible                                           
 concurrent_lifespans_per_task                         | 0                   | 0                   | integer | Experimental: Run a fixed number of groups concurrently for eligible JOINs                 
 default_filter_factor_enabled                         | false               | false               | boolean | use a default filter factor for unknown filters in a filter node                           
 dictionary_aggregation                                | false               | false               | boolean | Enable optimization for aggregations on dictionaries                                       
 distributed_index_join                                | false               | false               | boolean | Distribute index joins on join keys instead of executing inline                            
 distributed_sort                                      | true                | true                | boolean | Parallelize sort across multiple nodes                                                     
 dynamic_schedule_for_grouped_execution                | false               | false               | boolean | Experimental: Use dynamic schedule for grouped execution when possible                     
 enable_dynamic_filtering                              | true                | true                | boolean | Enable dynamic filtering                                                                   
 enable_intermediate_aggregations                      | false               | false               | boolean | Enable the use of intermediate aggregations                                                
 enable_large_dynamic_filters                          | false               | false               | boolean | Enable collection of large dynamic filters                                                 
 enable_stats_calculator                               | true                | true                | boolean | Enable statistics calculator                                                               
 exchange_compression                                  | false               | false               | boolean | Enable compression in exchanges                                                            
 execution_policy                                      | all-at-once         | all-at-once         | varchar | Policy used for scheduling query tasks                                                     
 filter_and_project_min_output_page_row_count          | 256                 | 256                 | integer | Experimental: Minimum output page row count for filter and project operators               
 filter_and_project_min_output_page_size               | 500kB               | 500kB               | varchar | Experimental: Minimum output page size for filter and project operators                    
 grouped_execution                                     | false               | false               | boolean | Use grouped execution when possible                                                        
 hash_partition_count                                  | 100                 | 100                 | integer | Number of partitions for distributed joins and aggregations                                
 ignore_downstream_preferences                         | false               | false               | boolean | Ignore Parent's PreferredProperties in AddExchange optimizer                               
 ignore_stats_calculator_failures                      | true                | true                | boolean | Ignore statistics calculator failures                                                      
 initial_splits_per_node                               | 4                   | 4                   | integer | The number of splits each node will run per task, initially                                
 iterative_optimizer_timeout                           | 3.00m               | 3.00m               | varchar | Timeout for plan optimization in iterative optimizer                                       
 iterative_rule_based_column_pruning                   | true                | true                | boolean | Use iterative rules to prune unreferenced columns                                          
 join_distribution_type                                | AUTOMATIC           | AUTOMATIC           | varchar | Join distribution type. Possible values: [BROADCAST, PARTITIONED, AUTOMATIC]               
 join_max_broadcast_table_size                         | 100MB               | 100MB               | varchar | Maximum estimated size of a table that can be broadcast when using automatic join type sele
 join_reordering_strategy                              | AUTOMATIC           | AUTOMATIC           | varchar | Join reordering strategy. Possible values: [NONE, ELIMINATE_CROSS_JOINS, AUTOMATIC]        
 late_materialization                                  | false               | false               | boolean | Experimental: Use late materialization (including WorkProcessor pipelines)                 
 max_drivers_per_task                                  |                     |                     | integer | Maximum number of drivers per task                                                         
 max_recursion_depth                                   | 10                  | 10                  | integer | Maximum recursion depth for recursive common table expression                              
 max_reordered_joins                                   | 9                   | 9                   | integer | The maximum number of joins to reorder as one group in cost-based join reordering          
 max_unacknowledged_splits_per_task                    | 500                 | 500                 | integer | Maximum number of leaf splits awaiting delivery to a given task                            
 merge_project_with_values                             | true                | true                | boolean | Inline project expressions into values                                                     
 omit_datetime_type_precision                          | false               | false               | boolean | Omit precision when rendering datetime type names with default precision                   
 optimize_duplicate_insensitive_joins                  | true                | true                | boolean | Optimize duplicate insensitive joins                                                       
 optimize_hash_generation                              | true                | true                | boolean | Compute hash codes for distribution, joins, and aggregations early in query plan           
 optimize_metadata_queries                             | false               | false               | boolean | Enable optimization for metadata queries                                                   
 optimize_mixed_distinct_aggregations                  | false               | false               | boolean | Optimize mixed non-distinct and distinct aggregations                                      
 optimize_top_n_ranking                                | true                | true                | boolean | Use top N ranking optimization                                                             
 parse_decimal_literals_as_double                      | false               | false               | boolean | Parse decimal literals as DOUBLE instead of DECIMAL                                        
 predicate_pushdown_use_table_properties               | true                | true                | boolean | Use table properties in predicate pushdown                                                 
 prefer_partial_aggregation                            | true                | true                | boolean | Prefer splitting aggregations into partial and final stages                                
 prefer_streaming_operators                            | false               | false               | boolean | Prefer source table layouts that produce streaming operators                               
 preferred_write_partitioning_min_number_of_partitions | 50                  | 50                  | integer | Use preferred write partitioning when the number of written partitions exceeds the configur
 push_aggregation_through_outer_join                   | true                | true                | boolean | Allow pushing aggregations below joins                                                     
 push_partial_aggregation_through_join                 | false               | false               | boolean | Push partial aggregations below joins                                                      
 push_table_write_through_union                        | true                | true                | boolean | Parallelize writes when using UNION ALL in queries that write data                         
 query_max_cpu_time                                    | 1000000000.00d      | 1000000000.00d      | varchar | Maximum CPU time of a query                                                                
 query_max_execution_time                              | 60.00m              | 60.00m              | varchar | Maximum execution time of a query                                                          
 query_max_planning_time                               | 10.00m              | 10.00m              | varchar | Maximum planning time of a query                                                           
 query_max_run_time                                    | 60.00m              | 60.00m              | varchar | Maximum run time of a query (includes the queueing time)                                   
 query_max_scan_physical_bytes                         |                     |                     | varchar | Maximum scan physical bytes of a query                                                     
 query_priority                                        | 1                   | 1                   | integer | The priority of queries. Larger numbers are higher priority                                
 redistribute_writes                                   | true                | true                | boolean | Force parallel distributed writes                                                          
 required_workers_count                                | 1                   | 1                   | integer | Minimum number of active workers that must be available before the query will start        
 required_workers_max_wait_time                        | 5.00m               | 5.00m               | varchar | Maximum time to wait for minimum number of workers before the query is failed              
 resource_overcommit                                   | false               | false               | boolean | Use resources which are not guaranteed to be available to the query                        
 rewrite_filtering_semi_join_to_inner_join             | true                | true                | boolean | Rewrite semi join in filtering context to inner join                                       
 scale_writers                                         | false               | false               | boolean | Scale out writers based on throughput (use minimum necessary)                              
 skip_redundant_sort                                   | true                | true                | boolean | Skip redundant sort operations                                                             
 spatial_join                                          | true                | true                | boolean | Use spatial index for spatial join when possible                                           
 spatial_partitioning_table_name                       |                     |                     | varchar | Name of the table containing spatial partitioning scheme                                   
 spill_enabled                                         | false               | false               | boolean | Enable spilling                                                                            
 spill_order_by                                        | true                | true                | boolean | Spill in OrderBy if spill_enabled is also set                                              
 spill_window_operator                                 | true                | true                | boolean | Spill in WindowOperator if spill_enabled is also set                                       
 split_concurrency_adjustment_interval                 | 100.00ms            | 100.00ms            | varchar | Experimental: Interval between changes to the number of concurrent splits per node         
 statistics_cpu_timer_enabled                          | true                | true                | boolean | Experimental: Enable cpu time tracking for automatic column statistics collection on write 
 table_scan_node_partitioning_min_bucket_to_task_ratio | 0.5                 | 0.5                 | double  | Min table scan bucket to task ratio for which plan will be adopted to node pre-partitioned 
 task_concurrency                                      | 16                  | 16                  | integer | Default number of local parallel jobs per worker                                           
 task_share_index_loading                              | false               | false               | boolean | Share index join lookups and caching within a task                                         
 task_writer_count                                     | 1                   | 1                   | integer | Default number of local parallel table writer jobs per worker                              
 unwrap_casts                                          | true                | true                | boolean | Enable optimization to unwrap CAST expression                                              
 use_legacy_window_filter_pushdown                     | false               | false               | boolean | Use legacy window filter pushdown optimizer                                                
 use_mark_distinct                                     | true                | true                | boolean | Implement DISTINCT aggregations using MarkDistinct                                         
 use_preferred_write_partitioning                      | true                | true                | boolean | Use preferred write partitioning                                                           
 use_table_scan_node_partitioning                      | true                | true                | boolean | Adapt plan to node pre-partitioned tables                                                  
 writer_min_size                                       | 32MB                | 32MB                | varchar | Target minimum size of writer output when scaling writers                                  
 hive.bucket_execution_enabled                         | true                | true                | boolean | Enable bucket-aware execution: only use a single worker per bucket                         
 hive.collect_column_statistics_on_write               | true                | true                | boolean | Enables automatic column level statistics collection on write                              
 hive.compression_codec                                | GZIP                | GZIP                | varchar | Compression codec to use when writing files. Possible values: [NONE, SNAPPY, LZ4, ZSTD, GZI
 hive.create_empty_bucket_files                        | false               | false               | boolean | Create empty files for buckets that have no data                                           
 hive.dynamic_filtering_probe_blocking_timeout         | 0.00m               | 0.00m               | varchar | Duration to wait for completion of dynamic filters during split generation for probe side t
 hive.experimental_parquet_optimized_writer_enabled    | false               | false               | boolean | Experimental: Enable optimized writer                                                      
 hive.force_local_scheduling                           | false               | false               | boolean | Only schedule splits on workers colocated with data node                                   
 hive.hive_storage_format                              | ORC                 | ORC                 | varchar | Default storage format for new tables or partitions. Possible values: [ORC, PARQUET, AVRO, 
 hive.ignore_absent_partitions                         | true                | true                | boolean | Ignore partitions when the file system location does not exist rather than failing the quer
 hive.ignore_corrupted_statistics                      | false               | false               | boolean | Experimental: Ignore corrupted statistics rather than failing                              
 hive.insert_existing_partitions_behavior              | APPEND              | APPEND              | varchar | Behavior on insert existing partitions; this session property doesn't control behavior on i
 hive.legacy_hive_view_translation                     | false               | false               | boolean | Use legacy Hive view translation mechanism                                                 
 hive.optimize_mismatched_bucket_count                 | false               | false               | boolean | Experimental: Enable optimization to avoid shuffle when bucket count is compatible but not 
 hive.optimize_symlink_listing                         | true                | true                | boolean | Optimize listing for SymlinkTextFormat tables with files in a single directory             
 hive.orc_bloom_filters_enabled                        | false               | false               | boolean | ORC: Enable bloom filters for predicate pushdown                                           
 hive.orc_lazy_read_small_ranges                       | true                | true                | boolean | Experimental: ORC: Read small file segments lazily                                         
 hive.orc_max_buffer_size                              | 8MB                 | 8MB                 | varchar | ORC: Maximum size of a single read                                                         
 hive.orc_max_merge_distance                           | 1MB                 | 1MB                 | varchar | ORC: Maximum size of gap between two reads to merge into a single read                     
 hive.orc_max_read_block_size                          | 16MB                | 16MB                | varchar | ORC: Soft max size of Trino blocks produced by ORC reader                                  
 hive.orc_nested_lazy_enabled                          | true                | true                | boolean | Experimental: ORC: Lazily read nested data                                                 
 hive.orc_optimized_writer_max_dictionary_memory       | 16MB                | 16MB                | varchar | ORC: Max dictionary memory                                                                 
 hive.orc_optimized_writer_max_stripe_rows             | 10000000            | 10000000            | integer | ORC: Max stripe row count                                                                  
 hive.orc_optimized_writer_max_stripe_size             | 64MB                | 64MB                | varchar | ORC: Max stripe size                                                                       
 hive.orc_optimized_writer_min_stripe_size             | 32MB                | 32MB                | varchar | ORC: Min stripe size                                                                       
 hive.orc_optimized_writer_validate                    | false               | false               | boolean | ORC: Force all validation for files                                                        
 hive.orc_optimized_writer_validate_mode               | BOTH                | BOTH                | varchar | ORC: Level of detail in ORC validation. Possible values: [HASHED, DETAILED, BOTH]          
 hive.orc_optimized_writer_validate_percentage         | 0.0                 | 0.0                 | double  | ORC: sample percentage for validation for files                                            
 hive.orc_stream_buffer_size                           | 8MB                 | 8MB                 | varchar | ORC: Size of buffer for streaming reads                                                    
 hive.orc_string_statistics_limit                      | 64B                 | 64B                 | varchar | ORC: Maximum size of string statistics; drop if exceeding                                  
 hive.orc_tiny_stripe_threshold                        | 8MB                 | 8MB                 | varchar | ORC: Threshold below which an ORC stripe or file will read in its entirety                 
 hive.orc_use_column_names                             | false               | false               | boolean | Orc: Access ORC columns using names from the file                                          
 hive.parallel_partitioned_bucketed_writes             | true                | true                | boolean | Improve parallelism of partitioned and bucketed table writes                               
 hive.parquet_ignore_statistics                        | false               | false               | boolean | Ignore statistics from Parquet to allow querying files with corrupted or incorrect statisti
 hive.parquet_max_read_block_size                      | 16MB                | 16MB                | varchar | Parquet: Maximum size of a block to read                                                   
 hive.parquet_use_column_names                         | true                | true                | boolean | Parquet: Access Parquet columns using names from the file                                  
 hive.parquet_writer_block_size                        | 134217728B          | 134217728B          | varchar | Parquet: Writer block size                                                                 
 hive.parquet_writer_page_size                         | 1048576B            | 1048576B            | varchar | Parquet: Writer page size                                                                  
 hive.partition_statistics_sample_size                 | 100                 | 100                 | integer | Maximum sample size of the partitions column statistics                                    
 hive.projection_pushdown_enabled                      | true                | true                | boolean | Projection push down enabled for hive                                                      
 hive.query_partition_filter_required                  | false               | false               | boolean | Require filter on partition column                                                         
 hive.rcfile_optimized_writer_validate                 | false               | false               | boolean | RCFile: Validate writer files                                                              
 hive.respect_table_format                             | true                | true                | boolean | Write new partitions using table format rather than default storage format                 
 hive.s3_select_pushdown_enabled                       | false               | false               | boolean | S3 Select pushdown enabled                                                                 
 hive.sorted_writing_enabled                           | true                | true                | boolean | Enable writing to bucketed sorted tables                                                   
 hive.statistics_enabled                               | true                | true                | boolean | Expose table statistics                                                                    
 hive.temporary_staging_directory_enabled              | true                | true                | boolean | Should use temporary staging directory for write operations                                
 hive.temporary_staging_directory_path                 | /tmp/presto-${USER} | /tmp/presto-${USER} | varchar | Temporary staging directory location                                                       
 hive.timestamp_precision                              | MILLISECONDS        | MILLISECONDS        | varchar | Precision for timestamp columns in Hive tables. Possible values: [MILLISECONDS, MICROSECOND
 hive.validate_bucketing                               | true                | true                | boolean | Verify that data is bucketed correctly when reading                                        
 hiveaks.bucket_execution_enabled                      | true                | true                | boolean | Enable bucket-aware execution: only use a single worker per bucket                         
 hiveaks.collect_column_statistics_on_write            | true                | true                | boolean | Enables automatic column level statistics collection on write                              
 hiveaks.compression_codec                             | GZIP                | GZIP                | varchar | Compression codec to use when writing files. Possible values: [NONE, SNAPPY, LZ4, ZSTD, GZI
 hiveaks.create_empty_bucket_files                     | false               | false               | boolean | Create empty files for buckets that have no data                                           
 hiveaks.dynamic_filtering_probe_blocking_timeout      | 0.00m               | 0.00m               | varchar | Duration to wait for completion of dynamic filters during split generation for probe side t
 hiveaks.experimental_parquet_optimized_writer_enabled | false               | false               | boolean | Experimental: Enable optimized writer                                                      
 hiveaks.force_local_scheduling                        | false               | false               | boolean | Only schedule splits on workers colocated with data node                                   
 hiveaks.hive_storage_format                           | ORC                 | ORC                 | varchar | Default storage format for new tables or partitions. Possible values: [ORC, PARQUET, AVRO, 
 hiveaks.ignore_absent_partitions                      | true                | true                | boolean | Ignore partitions when the file system location does not exist rather than failing the quer
 hiveaks.ignore_corrupted_statistics                   | false               | false               | boolean | Experimental: Ignore corrupted statistics rather than failing                              
 hiveaks.insert_existing_partitions_behavior           | APPEND              | APPEND              | varchar | Behavior on insert existing partitions; this session property doesn't control behavior on i
 hiveaks.legacy_hive_view_translation                  | false               | false               | boolean | Use legacy Hive view translation mechanism                                                 
 hiveaks.optimize_mismatched_bucket_count              | false               | false               | boolean | Experimental: Enable optimization to avoid shuffle when bucket count is compatible but not 
 hiveaks.optimize_symlink_listing                      | true                | true                | boolean | Optimize listing for SymlinkTextFormat tables with files in a single directory             
 hiveaks.orc_bloom_filters_enabled                     | false               | false               | boolean | ORC: Enable bloom filters for predicate pushdown                                           
 hiveaks.orc_lazy_read_small_ranges                    | true                | true                | boolean | Experimental: ORC: Read small file segments lazily                                         
 hiveaks.orc_max_buffer_size                           | 8MB                 | 8MB                 | varchar | ORC: Maximum size of a single read                                                         
 hiveaks.orc_max_merge_distance                        | 1MB                 | 1MB                 | varchar | ORC: Maximum size of gap between two reads to merge into a single read                     
 hiveaks.orc_max_read_block_size                       | 16MB                | 16MB                | varchar | ORC: Soft max size of Trino blocks produced by ORC reader                                  
 hiveaks.orc_nested_lazy_enabled                       | true                | true                | boolean | Experimental: ORC: Lazily read nested data                                                 
 hiveaks.orc_optimized_writer_max_dictionary_memory    | 16MB                | 16MB                | varchar | ORC: Max dictionary memory                                                                 
 hiveaks.orc_optimized_writer_max_stripe_rows          | 10000000            | 10000000            | integer | ORC: Max stripe row count                                                                  
 hiveaks.orc_optimized_writer_max_stripe_size          | 64MB                | 64MB                | varchar | ORC: Max stripe size                                                                       
 hiveaks.orc_optimized_writer_min_stripe_size          | 32MB                | 32MB                | varchar | ORC: Min stripe size                                                                       
 hiveaks.orc_optimized_writer_validate                 | false               | false               | boolean | ORC: Force all validation for files                                                        
 hiveaks.orc_optimized_writer_validate_mode            | BOTH                | BOTH                | varchar | ORC: Level of detail in ORC validation. Possible values: [HASHED, DETAILED, BOTH]          
 hiveaks.orc_optimized_writer_validate_percentage      | 0.0                 | 0.0                 | double  | ORC: sample percentage for validation for files                                            
 hiveaks.orc_stream_buffer_size                        | 8MB                 | 8MB                 | varchar | ORC: Size of buffer for streaming reads                                                    
 hiveaks.orc_string_statistics_limit                   | 64B                 | 64B                 | varchar | ORC: Maximum size of string statistics; drop if exceeding                                  
 hiveaks.orc_tiny_stripe_threshold                     | 8MB                 | 8MB                 | varchar | ORC: Threshold below which an ORC stripe or file will read in its entirety                 
 hiveaks.orc_use_column_names                          | false               | false               | boolean | Orc: Access ORC columns using names from the file                                          
 hiveaks.parallel_partitioned_bucketed_writes          | true                | true                | boolean | Improve parallelism of partitioned and bucketed table writes                               
 hiveaks.parquet_ignore_statistics                     | false               | false               | boolean | Ignore statistics from Parquet to allow querying files with corrupted or incorrect statisti
 hiveaks.parquet_max_read_block_size                   | 16MB                | 16MB                | varchar | Parquet: Maximum size of a block to read                                                   
 hiveaks.parquet_use_column_names                      | true                | true                | boolean | Parquet: Access Parquet columns using names from the file                                  
 hiveaks.parquet_writer_block_size                     | 134217728B          | 134217728B          | varchar | Parquet: Writer block size                                                                 
 hiveaks.parquet_writer_page_size                      | 1048576B            | 1048576B            | varchar | Parquet: Writer page size                                                                  
 hiveaks.partition_statistics_sample_size              | 100                 | 100                 | integer | Maximum sample size of the partitions column statistics                                    
 hiveaks.projection_pushdown_enabled                   | true                | true                | boolean | Projection push down enabled for hive                                                      
 hiveaks.query_partition_filter_required               | false               | false               | boolean | Require filter on partition column                                                         
 hiveaks.rcfile_optimized_writer_validate              | false               | false               | boolean | RCFile: Validate writer files                                                              
 hiveaks.respect_table_format                          | true                | true                | boolean | Write new partitions using table format rather than default storage format                 
 hiveaks.s3_select_pushdown_enabled                    | false               | false               | boolean | S3 Select pushdown enabled                                                                 
 hiveaks.sorted_writing_enabled                        | true                | true                | boolean | Enable writing to bucketed sorted tables                                                   
 hiveaks.statistics_enabled                            | true                | true                | boolean | Expose table statistics                                                                    
 hiveaks.temporary_staging_directory_enabled           | true                | true                | boolean | Should use temporary staging directory for write operations                                
 hiveaks.temporary_staging_directory_path              | /tmp/presto-${USER} | /tmp/presto-${USER} | varchar | Temporary staging directory location                                                       
 hiveaks.timestamp_precision                           | MILLISECONDS        | MILLISECONDS        | varchar | Precision for timestamp columns in Hive tables. Possible values: [MILLISECONDS, MICROSECOND
 hiveaks.validate_bucketing                            | true                | true                | boolean | Verify that data is bucketed correctly when reading                                        
 hiveqa.bucket_execution_enabled                       | true                | true                | boolean | Enable bucket-aware execution: only use a single worker per bucket                         
 hiveqa.collect_column_statistics_on_write             | true                | true                | boolean | Enables automatic column level statistics collection on write                              
 hiveqa.compression_codec                              | GZIP                | GZIP                | varchar | Compression codec to use when writing files. Possible values: [NONE, SNAPPY, LZ4, ZSTD, GZI
 hiveqa.create_empty_bucket_files                      | false               | false               | boolean | Create empty files for buckets that have no data                                           
 hiveqa.dynamic_filtering_probe_blocking_timeout       | 0.00m               | 0.00m               | varchar | Duration to wait for completion of dynamic filters during split generation for probe side t
 hiveqa.experimental_parquet_optimized_writer_enabled  | false               | false               | boolean | Experimental: Enable optimized writer                                                      
 hiveqa.force_local_scheduling                         | false               | false               | boolean | Only schedule splits on workers colocated with data node                                   
 hiveqa.hive_storage_format                            | ORC                 | ORC                 | varchar | Default storage format for new tables or partitions. Possible values: [ORC, PARQUET, AVRO, 
 hiveqa.ignore_absent_partitions                       | true                | true                | boolean | Ignore partitions when the file system location does not exist rather than failing the quer
 hiveqa.ignore_corrupted_statistics                    | false               | false               | boolean | Experimental: Ignore corrupted statistics rather than failing                              
 hiveqa.insert_existing_partitions_behavior            | APPEND              | APPEND              | varchar | Behavior on insert existing partitions; this session property doesn't control behavior on i
 hiveqa.legacy_hive_view_translation                   | false               | false               | boolean | Use legacy Hive view translation mechanism                                                 
 hiveqa.optimize_mismatched_bucket_count               | false               | false               | boolean | Experimental: Enable optimization to avoid shuffle when bucket count is compatible but not 
 hiveqa.optimize_symlink_listing                       | true                | true                | boolean | Optimize listing for SymlinkTextFormat tables with files in a single directory             
 hiveqa.orc_bloom_filters_enabled                      | false               | false               | boolean | ORC: Enable bloom filters for predicate pushdown                                           
 hiveqa.orc_lazy_read_small_ranges                     | true                | true                | boolean | Experimental: ORC: Read small file segments lazily                                         
 hiveqa.orc_max_buffer_size                            | 8MB                 | 8MB                 | varchar | ORC: Maximum size of a single read                                                         
 hiveqa.orc_max_merge_distance                         | 1MB                 | 1MB                 | varchar | ORC: Maximum size of gap between two reads to merge into a single read                     
 hiveqa.orc_max_read_block_size                        | 16MB                | 16MB                | varchar | ORC: Soft max size of Trino blocks produced by ORC reader                                  
 hiveqa.orc_nested_lazy_enabled                        | true                | true                | boolean | Experimental: ORC: Lazily read nested data                                                 
 hiveqa.orc_optimized_writer_max_dictionary_memory     | 16MB                | 16MB                | varchar | ORC: Max dictionary memory                                                                 
 hiveqa.orc_optimized_writer_max_stripe_rows           | 10000000            | 10000000            | integer | ORC: Max stripe row count                                                                  
 hiveqa.orc_optimized_writer_max_stripe_size           | 64MB                | 64MB                | varchar | ORC: Max stripe size                                                                       
 hiveqa.orc_optimized_writer_min_stripe_size           | 32MB                | 32MB                | varchar | ORC: Min stripe size                                                                       
 hiveqa.orc_optimized_writer_validate                  | false               | false               | boolean | ORC: Force all validation for files                                                        
 hiveqa.orc_optimized_writer_validate_mode             | BOTH                | BOTH                | varchar | ORC: Level of detail in ORC validation. Possible values: [HASHED, DETAILED, BOTH]          
 hiveqa.orc_optimized_writer_validate_percentage       | 0.0                 | 0.0                 | double  | ORC: sample percentage for validation for files                                            
 hiveqa.orc_stream_buffer_size                         | 8MB                 | 8MB                 | varchar | ORC: Size of buffer for streaming reads                                                    
 hiveqa.orc_string_statistics_limit                    | 64B                 | 64B                 | varchar | ORC: Maximum size of string statistics; drop if exceeding                                  
 hiveqa.orc_tiny_stripe_threshold                      | 8MB                 | 8MB                 | varchar | ORC: Threshold below which an ORC stripe or file will read in its entirety                 
 hiveqa.orc_use_column_names                           | false               | false               | boolean | Orc: Access ORC columns using names from the file                                          
 hiveqa.parallel_partitioned_bucketed_writes           | true                | true                | boolean | Improve parallelism of partitioned and bucketed table writes                               
 hiveqa.parquet_ignore_statistics                      | false               | false               | boolean | Ignore statistics from Parquet to allow querying files with corrupted or incorrect statisti
 hiveqa.parquet_max_read_block_size                    | 16MB                | 16MB                | varchar | Parquet: Maximum size of a block to read                                                   
 hiveqa.parquet_use_column_names                       | true                | true                | boolean | Parquet: Access Parquet columns using names from the file                                  
 hiveqa.parquet_writer_block_size                      | 134217728B          | 134217728B          | varchar | Parquet: Writer block size                                                                 
 hiveqa.parquet_writer_page_size                       | 1048576B            | 1048576B            | varchar | Parquet: Writer page size                                                                  
 hiveqa.partition_statistics_sample_size               | 100                 | 100                 | integer | Maximum sample size of the partitions column statistics                                    
 hiveqa.projection_pushdown_enabled                    | true                | true                | boolean | Projection push down enabled for hive                                                      
 hiveqa.query_partition_filter_required                | false               | false               | boolean | Require filter on partition column                                                         
 hiveqa.rcfile_optimized_writer_validate               | false               | false               | boolean | RCFile: Validate writer files                                                              
 hiveqa.respect_table_format                           | true                | true                | boolean | Write new partitions using table format rather than default storage format                 
 hiveqa.s3_select_pushdown_enabled                     | false               | false               | boolean | S3 Select pushdown enabled                                                                 
 hiveqa.sorted_writing_enabled                         | true                | true                | boolean | Enable writing to bucketed sorted tables                                                   
 hiveqa.statistics_enabled                             | true                | true                | boolean | Expose table statistics                                                                    
 hiveqa.temporary_staging_directory_enabled            | true                | true                | boolean | Should use temporary staging directory for write operations                                
 hiveqa.temporary_staging_directory_path               | /tmp/presto-${USER} | /tmp/presto-${USER} | varchar | Temporary staging directory location                                                       
 hiveqa.timestamp_precision                            | MILLISECONDS        | MILLISECONDS        | varchar | Precision for timestamp columns in Hive tables. Possible values: [MILLISECONDS, MICROSECOND
 hiveqa.validate_bucketing                             | true                | true                | boolean | Verify that data is bucketed correctly when reading
electrum commented 3 years ago

@vinay-kl Thanks. Which of the Hive catalogs are you running this against (in case the session properties are different)?

vinay-kl commented 3 years ago

@electrum it's happening with hiveqa only.

martint commented 3 years ago

It could be related to https://github.com/trinodb/trino/pull/5582/commits/61330e14cfbf888c7a1c524d3128df85669c89d5, but there are protections in place to rewrite SemiJoin to Inner Join when they appear under a DeleteNode. Maybe there's a case we're missing.

findepi commented 3 years ago

I tried to reproduce this with 356 version:

git checkout 356

# compile because env launcher requires local build
./mvnw -pl '!:trino-server-rpm,!:trino-docs,!:trino-proxy,!:trino-verifier,!:trino-benchto-benchmarks' clean install -nsu -DskipTests -Dmaven.javadoc.skip=true -Dair.check.skip-all=true 

# launch env with HDP3 metastore to use transactional tables
bin/ptl env up --config config-hdp3 --environment singlenode

but i did not get a failure:

trino:default> DROP TABLE IF EXISTS par_and_buck_by1;
            -> create table par_and_buck_by1 (a varchar, b int, c varchar)
            -> WITH ( format = 'ORC', partitioned_by = ARRAY['c'],
            ->      bucketed_by = ARRAY['a'], bucket_count = 10, transactional=true);
            ->
            -> insert into par_and_buck_by1 values ('a',1,'a'), ('b',2,'b');
            ->
            -> DROP TABLE IF EXISTS par_and_buck_by_ext;
            -> CREATE TABLE par_and_buck_by_ext (a varchar, b integer, c varchar);
            ->
            -> insert INTO par_and_buck_by_ext VALUES ('c', 3, 'a'), ('b', 2, 'a'), ('a', 1, 'b'), ('c', 3, 'b');
            ->
            -> -- delete from par_and_buck_by1 where c in ('a', 'b');
            -> -- delete from par_and_buck_by1 where c in (VALUES 'a', 'b');
            -> -- delete from par_and_buck_by1 where c in (select c from par_and_buck_by_ext);
            -> delete from par_and_buck_by1 where c in (select distinct c from par_and_buck_by_ext);
            ->
DROP TABLE
CREATE TABLE
INSERT: 2 rows

Query 20210528_202203_00043_4f9wz, FINISHED, 1 node
http://localhost:8080/ui/query.html?20210528_202203_00043_4f9wz
Splits: 35 total, 35 done (100.00%)
CPU Time: 0.0s total,     0 rows/s,     0B/s, 1% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
Peak Memory: 0B
1.57 [0 rows, 0B] [0 rows/s, 0B/s]

DROP TABLE
CREATE TABLE
INSERT: 4 rows

Query 20210528_202206_00046_4f9wz, FINISHED, 1 node
http://localhost:8080/ui/query.html?20210528_202206_00046_4f9wz
Splits: 35 total, 35 done (100.00%)
CPU Time: 0.0s total,     0 rows/s,     0B/s, 3% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
Peak Memory: 0B
0.69 [0 rows, 0B] [0 rows/s, 0B/s]

DELETE: 2 rows

Query 20210528_202207_00047_4f9wz, FINISHED, 1 node
http://localhost:8080/ui/query.html?20210528_202207_00047_4f9wz
Splits: 37 total, 37 done (100.00%)
CPU Time: 0.0s total,   270 rows/s, 2.69KB/s, 2% active
Per Node: 0.0 parallelism,     6 rows/s,    64B/s
Parallelism: 0.0
Peak Memory: 86B
1.60 [10 rows, 102B] [6 rows/s, 64B/s]

@vinay-kl

vinay-kl commented 3 years ago

@findepi the issue is not reproducible with 357 and we are using 3 nodes on QA env wherein the issue was persistent with 356 under hiveqa catalog.

config.properties: |
    coordinator=true
    query.max-memory=15GB
    query.low-memory-killer.policy=total-reservation-on-blocked-nodes
    query.max-execution-time=60m
    query.max-run-time=60m
    query.max-stage-count=240
    memory.heap-headroom-per-node=1GB
    node-scheduler.include-coordinator=false
    http-server.http.port=8080
    discovery.uri=http://localhost:8080
    http-server.https.enabled=true
    http-server.https.port=8443
    discovery-server.enabled=true
    http-server.authentication.allow-insecure-over-http=true
    web-ui.enabled=true
 hiveqa.properties: |
    connector.name=hive-hadoop2
    hive.non-managed-table-writes-enabled=true
    hive.ignore-absent-partitions=true
    #hive.views-execution.enabled=true
    hive.translate-hive-views=true
    hive.allow-drop-table=true
    hive.non-managed-table-writes-enabled=true
    hive.non-managed-table-creates-enabled=true
    hive.metastore.uri=thrift://<ip>:9083,thrift://<ip>:9083
    hive.config.resources=/usr/lib/trino/default/etc/catalog/core-site.xml
    #hive.translate-hive-views=true