Closed ndrluis closed 13 hours ago
This error occurs in version 462 as well.
cc @losipiuk
Thanks @ndrluis . I will look into that.
As a workaround use
SET SESSION fault_tolerant_execution_adaptive_query_planning_enabled=false
or set fault-tolerant-execution-adaptive-query-planning-enabled
config property to false.
cc: @gaurav8297
@ndrluis So the example query does not really reproduce the problem, right? At least it does not for me. Is the query you see like this one? Would you be fine with sharing exact query and query plan with columnt/table names obfuscated?
@losipiuk No, this is the exact query with the column and table names obfuscated. I will provide the query plan for you. Which type and format do you need?
Hmm. The explain
on the create or replace
statement should do. Also maybe data types for columns, it should not matter but ...
Also what are the approximate sizes of the tables involved?
My question is about the EXPLAIN format and type, for example:
FORMAT { TEXT | GRAPHVIZ | JSON }
TYPE { LOGICAL | DISTRIBUTED | VALIDATE | IO }
https://trino.io/docs/current/sql/explain.html
My question is about the EXPLAIN format and type, for example:
Just EXPLAIN ...
. No extra options
@ndrluis can you run on 463? There is no fix - but exception message should be more descriptive
@ndrluis any chance you could test it out on 463? Could help find the root cause.
We've seen this occurring on some queries during cluster scale down only - in case the gives you any hints
@losipiuk i'm sorry about the delay.
Stacktrace:
java.lang.IllegalStateException: Source subPlans not found for exchange node 975; sourceIds: [853]; filteredSubPlans: []; allSubPlans: [7->33, 8->32, 9->1363, 4->1335, 5->1348, 6->1343, 3->1060]
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:399)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:375)
at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:278)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:384)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:375)
at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:278)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
at io.trino.sql.planner.plan.PlanVisitor.visitTableWriter(PlanVisitor.java:134)
at io.trino.sql.planner.plan.TableWriterNode.accept(TableWriterNode.java:173)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:384)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:375)
at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:278)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:384)
at io.trino.sql.planner.AdaptivePlanner$ExchangeSourceIdToSubPlanCollector.visitExchange(AdaptivePlanner.java:375)
at io.trino.sql.planner.plan.ExchangeNode.accept(ExchangeNode.java:278)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
at io.trino.sql.planner.plan.PlanVisitor.visitTableFinish(PlanVisitor.java:169)
at io.trino.sql.planner.plan.TableFinishNode.accept(TableFinishNode.java:105)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
at io.trino.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:82)
at io.trino.sql.planner.AdaptivePlanner.optimize(AdaptivePlanner.java:143)
at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.optimizePlan(EventDrivenFaultTolerantQueryScheduler.java:1130)
at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.optimize(EventDrivenFaultTolerantQueryScheduler.java:1089)
at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.schedule(EventDrivenFaultTolerantQueryScheduler.java:1021)
at io.trino.execution.scheduler.faulttolerant.EventDrivenFaultTolerantQueryScheduler$Scheduler.run(EventDrivenFaultTolerantQueryScheduler.java:847)
at io.trino.$gen.Trino_463____20241101_144658_2.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1575)
Explain
Trino version: 463
Fragment 0 [COORDINATOR_ONLY]
Output layout: [rows]
Output partitioning: SINGLE []
Output[columnNames = [rows]]
│ Layout: [rows:bigint]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
└─ TableCommit[target = schema_placeholder:db_placeholder.target_table]
│ Layout: [rows:bigint]
│ Collected statistics:
│ aggregations =>
│ $iceberg_theta_stat[id_col1] => [iceberg_theta_stat_id_col1 := $iceberg_theta_stat(iceberg_theta_stat)]
│ $iceberg_theta_stat[id_col2] => [iceberg_theta_stat_id_col2 := $iceberg_theta_stat(iceberg_theta_stat_20)]
│ $iceberg_theta_stat[id_col3] => [iceberg_theta_stat_id_col3 := $iceberg_theta_stat(iceberg_theta_stat_21)]
│ $iceberg_theta_stat[id_col4] => [iceberg_theta_stat_id_col4 := $iceberg_theta_stat(iceberg_theta_stat_22)]
│ $iceberg_theta_stat[description_col5] => [iceberg_theta_stat_description_col5 := $iceberg_theta_stat(iceberg_theta_stat_23)]
│ $iceberg_theta_stat[value_col6] => [iceberg_theta_stat_value_col6 := $iceberg_theta_stat(iceberg_theta_stat_24)]
│ $iceberg_theta_stat[datetime_col7] => [iceberg_theta_stat_datetime_col7 := $iceberg_theta_stat(iceberg_theta_stat_25)]
│ grouped by => []
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [partialrows:bigint, fragment:varbinary, iceberg_theta_stat:varbinary, iceberg_theta_stat_20:varbinary, iceberg_theta_stat_21:varbinary, iceberg_theta_stat_22:varbinary, iceberg_theta_stat_23:varbinary, iceberg_theta_stat_24:varbinary, iceberg_theta_stat_25:varbinary]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [1]]
Layout: [partialrows:bigint, fragment:varbinary, iceberg_theta_stat:varbinary, iceberg_theta_stat_20:varbinary, iceberg_theta_stat_21:varbinary, iceberg_theta_stat_22:varbinary, iceberg_theta_stat_23:varbinary, iceberg_theta_stat_24:varbinary, iceberg_theta_stat_25:varbinary]
Fragment 1 [ROUND_ROBIN (scale writers)]
Output layout: [partialrows, fragment, iceberg_theta_stat, iceberg_theta_stat_20, iceberg_theta_stat_21, iceberg_theta_stat_22, iceberg_theta_stat_23, iceberg_theta_stat_24, iceberg_theta_stat_25]
Output partitioning: SINGLE []
TableWriter[]
│ Layout: [partialrows:bigint, fragment:varbinary, iceberg_theta_stat:varbinary, iceberg_theta_stat_20:varbinary, iceberg_theta_stat_21:varbinary, iceberg_theta_stat_22:varbinary, iceberg_theta_stat_23:varbinary, iceberg_theta_stat_24:varbinary, iceberg_theta_stat_25:varbinary]
│ id_col1 := id_col1_0
│ id_col2 := id_col2
│ id_col3 := id_col3
│ id_col4 := id_col4
│ description_col5 := description_col5
│ value_col6 := value_col6
│ datetime_col7 := datetime_col7
│ Collected statistics:
│ aggregations =>
│ $iceberg_theta_stat[id_col1] => [iceberg_theta_stat := $iceberg_theta_stat(id_col1_0)]
│ $iceberg_theta_stat[id_col2] => [iceberg_theta_stat_20 := $iceberg_theta_stat(id_col2)]
│ $iceberg_theta_stat[id_col3] => [iceberg_theta_stat_21 := $iceberg_theta_stat(id_col3)]
│ $iceberg_theta_stat[id_col4] => [iceberg_theta_stat_22 := $iceberg_theta_stat(id_col4)]
│ $iceberg_theta_stat[description_col5] => [iceberg_theta_stat_23 := $iceberg_theta_stat(description_col5)]
│ $iceberg_theta_stat[value_col6] => [iceberg_theta_stat_24 := $iceberg_theta_stat(value_col6)]
│ $iceberg_theta_stat[datetime_col7] => [iceberg_theta_stat_25 := $iceberg_theta_stat(datetime_col7)]
│ grouped by => []
└─ LocalExchange[partitioning = ROUND_ROBIN (scale writers)]
│ Layout: [value_col6:bigint, id_col1_0:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
│ Estimates: {rows: 79,092,455 (3.05GB), cpu: 3.05G, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [value_col6:bigint, id_col1_0:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
Fragment 2 [HASH]
Output layout: [value_col6, id_col1_0, id_col2, id_col3, id_col4, description_col5, datetime_col7]
Output partitioning: ROUND_ROBIN (scale writers) []
InnerJoin[criteria = (id_col1_0 = id_col1), distribution = PARTITIONED]
│ Layout: [id_col1_0:integer, value_col6:bigint, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
│ Estimates: {rows: 79,092,455 (3.05GB), cpu: 11.72G, memory: 2.38GB, network: 0B}
│ Distribution: PARTITIONED
├─ RemoteSource[sourceFragmentIds = [3]]
│ Layout: [id_col1_0:integer, value_col6:bigint]
└─ LocalExchange[partitioning = HASH, arguments = [id_col1::integer]]
│ Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
│ Estimates: {rows: 79,092,455 (2.38GB), cpu: 2.38G, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [4]]
Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
Fragment 3 [SOURCE]
Output layout: [id_col1_0, value_col6]
Output partitioning: HASH [id_col1_0]
ScanFilter[table = schema2:db2.joined_table1$data@408428557136201187, dynamicFilters = {id_col1_0 = #df_1059}]
Layout: [id_col1_0:integer, value_col6:bigint]
Estimates: {rows: 481,923,980 (6.28GB), cpu: 6.28G, memory: 0B, network: 0B}
id_col1_0 := 1:id_col1:integer
value_col6 := 4:value_col6:bigint
Fragment 4 [SOURCE]
Output layout: [id_col1, id_col2, id_col3, id_col4, description_col5, datetime_col7]
Output partitioning: HASH [id_col1]
DynamicFilterSource[dynamicFilterAssignments = {id_col1 -> #df_1059}]
│ Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
│ Estimates: {rows: 79,092,455 (2.38GB), cpu: ?, memory: ?, network: ?}
└─ InnerJoin[criteria = (expr = id_col4_8), distribution = REPLICATED]
│ Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, datetime_col7:timestamp(6)]
│ Estimates: {rows: 79,092,455 (2.38GB), cpu: 5.43G, memory: 90.84kB, network: 0B}
│ Distribution: REPLICATED
├─ InnerJoin[criteria = (id_col3 = id_col3_4), distribution = REPLICATED]
│ │ Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, expr:bigint, datetime_col7:timestamp(6)]
│ │ Estimates: {rows: 79,092,455 (3.05GB), cpu: 8.32G, memory: 24.57MB, network: 0B}
│ │ Distribution: REPLICATED
│ ├─ ScanFilterProject[table = schema1:db1.main_table$data@2894234669994434375, dynamicFilters = {CAST(id_col4 AS bigint) = #df_1061, id_col3 = #df_1062}]
│ │ Layout: [id_col1:integer, id_col2:integer, id_col3:integer, id_col4:integer, description_col5:varchar, expr:bigint]
│ │ Estimates: {rows: 163,791,633 (5.23GB), cpu: 3.86G, memory: 0B, network: 0B}
│ │ expr := CAST(id_col4 AS bigint)
│ │ id_col1 := 1:id_col1:integer
│ │ id_col4 := 4:id_col4:integer
│ │ id_col3 := 3:id_col3:integer
│ │ id_col2 := 2:id_col2:integer
│ │ description_col5 := 5:description_col5:varchar
│ └─ LocalExchange[partitioning = HASH, arguments = [id_col3_4::integer]]
│ │ Layout: [datetime_col7:timestamp(6), id_col3_4:integer]
│ │ Estimates: {rows: 533,566 (6.14MB), cpu: 6.14M, memory: 0B, network: 0B}
│ └─ RemoteSource[sourceFragmentIds = [5]]
│ Layout: [datetime_col7:timestamp(6), id_col3_4:integer]
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [id_col4_8:bigint]
│ Estimates: {rows: 2,584 (22.71kB), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [6]]
Layout: [id_col4_8:bigint]
Fragment 5 [SOURCE]
Output layout: [datetime_col7, id_col3_4]
Output partitioning: BROADCAST []
DynamicFilterSource[dynamicFilterAssignments = {id_col3_4 -> #df_1062}]
│ Layout: [datetime_col7:timestamp(6), id_col3_4:integer]
│ Estimates: {rows: 533,566 (6.14MB), cpu: ?, memory: ?, network: ?}
└─ ScanFilterProject[table = schema3:db3.joined_table2$data@1028980736129464491, filterPredicate = (status_col = varchar 'closed')]
Layout: [datetime_col7:timestamp(6), id_col3_4:integer]
Estimates: {rows: 4,802,096 (46.46MB), cpu: 72.96M, memory: 0B, network: 0B}
id_col3_4 := 1:id_col3:integer
datetime_col7 := 12:datetime_col7:timestamp(6)
status_col := 8:status_col:varchar
Fragment 6 [SOURCE]
Output layout: [id_col4_8]
Output partitioning: BROADCAST []
DynamicFilterSource[dynamicFilterAssignments = {id_col4_8 -> #df_1061}]
│ Layout: [id_col4_8:bigint]
│ Estimates: {rows: 2,584 (22.71kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilterProject[table = schema4:db4.joined_table3$data@6530872271903205519, filterPredicate = (status_col_10 = varchar 'active')]
Layout: [id_col4_8:bigint]
Estimates: {rows: 5,169 (45.43kB), cpu: 72.69k, memory: 0B, network: 0B}
id_col4_8 := 1:id_col4:bigint
status_col_10 := 14:status_col:varchar
Just ran into this for the query that is join order dependent. User had this:
select ...
from t1 as order_header
inner join t2 as order_item
on t1.order_intake_order_id = order_item.order_intake_order_id
inner join t3 as order_schedule_line
on order_item.order_intake_order_item_id = order_schedule_line.order_intake_order_item_id
inner join t4 as delivery_item
on order_schedule_line.order_intake_schedule_line_id = delivery_item.order_intake_schedule_line_id
inner join invoice_item
on delivery_item.order_intake_delivery_item_id = invoice_item.order_intake_delivery_item_id
left join t5 as invoice_item_product
on invoice_item.product_id = invoice_item_product.product_id
left join t5 as order_item_product
on order_item.product_id = order_item_product.product_id
left join t5 as schedule_line_product
on order_schedule_line.product_id = schedule_line_product.product_id
This was a CTE used in the table creation downstream. I've materialized it to a table instead and the error was gone. That got me thinking something must be off with this CTE, so I tried re-ordering of joins and realised its join order-dependent. So I ran the original query with fault_tolerant_execution_adaptive_join_reordering_enabled
and problem was fixed, but it ran a bit slower.
It seems the issue occurs when we have a join-order dependent CTE used in downstream joins. LMK if you want EXPLAIN for this :)
Ok - I have reproduction - working on a fix.
I'm on 461 and I just enabled fault_tolerant_execution_adaptive_join_reordering_enabled
and I'm seeing this.
It's an intermittent failure and it seems like it's also only occurring in join-order dependent CTEs.
Ok - I have reproduction - working on a fix.
@losipiuk how did you reproduce this?
Ok - I have reproduction - working on a fix.
@losipiuk how did you reproduce this?
https://gist.github.com/losipiuk/ede59b1b120388d5bfad2906458804eb
Run above in IcebergQueryRunnerWithTaskRetries
I updated my Trino cluster from version 455 to 461, and I'm receiving the following error: 'Source subPlans not found for exchange node.' I'm using dbt, and in a scenario with more than 500 models, this error occurs only in a specific query that, as far as I can tell, doesn't have anything unusual. It's not intermittent; it happens in every run.
My Trino cluster is configured with fault tolerance."
query example
Stacktrace