trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.83k stars 2.85k forks source link

Query failing with valid partition filter in v448 but succeeds in v437 #22268

Open abhinavdangi opened 3 weeks ago

abhinavdangi commented 3 weeks ago

Trino 448 is set up with the following property. delta.query-partition-filter-required=true While querying a table with partitions, with cast condition on the partition column;

SELECT test_partition_table_1.b
  FROM test_partition_table_1 
    LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
  WHERE
  test_partition_table_2.c IS NOT NULL and 
  cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229; -- not working
  -- part_col BETWEEN '20240201' and '20240229'; -- working

It fails with error:

io.trino.spi.TrinoException: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
    at io.trino.plugin.deltalake.DeltaLakeMetadata.validateScan(DeltaLakeMetadata.java:3258)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.validateScan(ClassLoaderSafeConnectorMetadata.java:1127)
    at io.trino.tracing.TracingConnectorMetadata.validateScan(TracingConnectorMetadata.java:1289)
    at io.trino.metadata.MetadataManager.validateScan(MetadataManager.java:2122)
    at io.trino.tracing.TracingMetadata.validateScan(TracingMetadata.java:1052)

Whereas, considering the plain condition without casting, it succeeds. Table definitions are as follows:

CREATE TABLE test_partition_table_1 (
   merge_col varchar,
   b varchar,
   part_col varchar
)
WITH (
   partitioned_by = ARRAY['part_col']
);
insert into test_partition_table_1 values ('spark','col1','20240201'),('trino','col2','20240202'), ('druid','col3','20240203');
select * from test_partition_table_1 where part_col is not null;
CREATE TABLE test_partition_table_2 (
   merge_col varchar,
   c varchar
);
insert into test_partition_table_2 values ('spark','c3'), ('trino','c1'), ('druid', 'c2');
select * from test_partition_table_2;

This is failing with versions 447 and 448 but succeeding with 437. Please help.

ebyhr commented 3 weeks ago

cc: @marcinsbd

marcinsbd commented 3 weeks ago

@abhinavdangi can you share the query plans for success (version 437) and failure ( version 447 or 448).

abhinavdangi commented 3 weeks ago

Version 448


trino:dev_7> explain SELECT test_partition_table_1.b
          ->   FROM test_partition_table_1
          ->     LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
          ->   WHERE
          ->   test_partition_table_2.c IS NOT NULL and
          ->   cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229;
Query 20240605_180616_00228_d4737 failed: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
io.trino.spi.TrinoException: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
    at io.trino.plugin.deltalake.DeltaLakeMetadata.validateScan(DeltaLakeMetadata.java:3258)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.validateScan(ClassLoaderSafeConnectorMetadata.java:1127)
    at io.trino.tracing.TracingConnectorMetadata.validateScan(TracingConnectorMetadata.java:1289)
    at io.trino.metadata.MetadataManager.validateScan(MetadataManager.java:2122)
    at io.trino.tracing.TracingMetadata.validateScan(TracingMetadata.java:1052)
    at io.trino.sql.planner.sanity.TableScanValidator$1.visitTableScan(TableScanValidator.java:37)
    at io.trino.sql.planner.sanity.TableScanValidator$1.visitTableScan(TableScanValidator.java:33)
    at io.trino.sql.planner.plan.TableScanNode.accept(TableScanNode.java:236)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
    at io.trino.sql.planner.plan.PlanVisitor.visitFilter(PlanVisitor.java:34)
    at io.trino.sql.planner.plan.FilterNode.accept(FilterNode.java:74)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
    at io.trino.sql.planner.plan.PlanVisitor.visitJoin(PlanVisitor.java:99)
    at io.trino.sql.planner.plan.JoinNode.accept(JoinNode.java:295)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
    at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
    at io.trino.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
    at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:82)
    at io.trino.sql.planner.sanity.TableScanValidator.validate(TableScanValidator.java:32)
    at io.trino.sql.planner.sanity.PlanSanityChecker.lambda$validate$0(PlanSanityChecker.java:107)
    at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:423)
    at io.trino.sql.planner.sanity.PlanSanityChecker.validate(PlanSanityChecker.java:107)
    at io.trino.sql.planner.sanity.PlanSanityChecker.validateFinalPlan(PlanSanityChecker.java:78)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:272)
    at io.trino.sql.analyzer.QueryExplainer.getLogicalPlan(QueryExplainer.java:176)
    at io.trino.sql.analyzer.QueryExplainer.getDistributedPlan(QueryExplainer.java:187)
    at io.trino.sql.analyzer.QueryExplainer.getPlan(QueryExplainer.java:106)
    at io.trino.sql.rewrite.ExplainRewrite$Visitor.getQueryPlan(ExplainRewrite.java:145)
    at io.trino.sql.rewrite.ExplainRewrite$Visitor.visitExplain(ExplainRewrite.java:129)
    at io.trino.sql.rewrite.ExplainRewrite$Visitor.visitExplain(ExplainRewrite.java:74)
    at io.trino.sql.tree.Explain.accept(Explain.java:61)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.rewrite.ExplainRewrite.rewrite(ExplainRewrite.java:71)
    at io.trino.sql.rewrite.StatementRewrite.rewrite(StatementRewrite.java:54)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:92)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:86)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:285)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:218)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:884)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:153)
    at io.trino.$gen.Trino_448____20240531_114046_2.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1570)
    Suppressed: java.lang.Exception: Current plan:
                Output[columnNames = [b]]
                │   Layout: [b:varchar]
                └─ InnerJoin[criteria = (merge_col = merge_col_0), filter = (CAST(part_col AS bigint) BETWEEN bigint '20240201' AND bigint '20240229'), distribution = REPLICATED]
                   │   Layout: [b:varchar]
                   │   Distribution: REPLICATED
                   │   dynamicFilterAssignments = {merge_col_0 -> #df_383}
                   ├─ ScanFilter[table = delta_prod:dev_7.test_partition_table_1, dynamicFilters = {merge_col = #df_383}]
                   │      Layout: [merge_col:varchar, b:varchar, part_col:varchar]
                   │      part_col := part_col:varchar:PARTITION_KEY
                   │      merge_col := merge_col:varchar:REGULAR
                   │      b := b:varchar:REGULAR
                   └─ LocalExchange[partitioning = SINGLE]
                      │   Layout: [merge_col_0:varchar]
                      └─ RemoteExchange[type = REPLICATE]
                         │   Layout: [merge_col_0:varchar]
                         └─ ScanFilterProject[table = delta_prod:dev_7.test_partition_table_2, filterPredicate = (NOT (c IS NULL))]
                                Layout: [merge_col_0:varchar]
                                c := c:varchar:REGULAR
                                merge_col_0 := merge_col:varchar:REGULAR

        at io.trino.sql.planner.sanity.PlanSanityChecker.validate(PlanSanityChecker.java:120)
        ... 25 more
abhinavdangi commented 3 weeks ago

Version 437


trino:dev_7> explain SELECT test_partition_table_1.b
          ->   FROM test_partition_table_1
          ->     LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
          ->   WHERE
          ->   test_partition_table_2.c IS NOT NULL and
          ->   cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229;
                                                                                                                     Query Plan                                                                                          >
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Trino version: 437                                                                                                                                                                                                      >
 Fragment 0 [SOURCE]                                                                                                                                                                                                     >
     Output layout: [b]                                                                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                                      >
     Output[columnNames = [b]]                                                                                                                                                                                           >
     │   Layout: [b:varchar]                                                                                                                                                                                             >
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                                                       >
     └─ InnerJoin[criteria = ("merge_col" = "merge_col_0"), distribution = REPLICATED]                                                                                                                                   >
        │   Layout: [b:varchar]                                                                                                                                                                                          >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 450B, network: 0B}                                                                                                                                                  >
        │   Distribution: REPLICATED                                                                                                                                                                                     >
        │   dynamicFilterAssignments = {merge_col_0 -> #df_461}                                                                                                                                                          >
        ├─ ScanFilterProject[table = delta_prod:dev_7.test_partition_table_1, filterPredicate = ((CAST("part_col" AS BIGINT) >= BIGINT '20240201') AND (CAST("part_col" AS BIGINT) <= BIGINT '20240229')), dynamicFilters>
        │      Layout: [merge_col:varchar, b:varchar]                                                                                                                                                                    >
        │      Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                   >
        │      b := b:varchar:REGULAR                                                                                                                                                                                    >
        │      part_col := part_col:varchar:PARTITION_KEY                                                                                                                                                                >
        │      merge_col := merge_col:varchar:REGULAR                                                                                                                                                                    >
        └─ LocalExchange[partitioning = SINGLE]                                                                                                                                                                          >
           │   Layout: [merge_col_0:varchar]                                                                                                                                                                             >
           │   Estimates: {rows: 3 (30B), cpu: 0, memory: 0B, network: 0B}                                                                                                                                               >
           └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                                                      >
                  Layout: [merge_col_0:varchar]                                                                                                                                                                          >
                                                                                                                                                                                                                         >
 Fragment 1 [SOURCE]                                                                                                                                                                                                     >
     Output layout: [merge_col_0]                                                                                                                                                                                        >
     Output partitioning: BROADCAST []                                                                                                                                                                                   >
     ScanFilterProject[table = delta_prod:dev_7.test_partition_table_2, filterPredicate = (NOT ("c" IS NULL))]                                                                                                           >
         Layout: [merge_col_0:varchar]                                                                                                                                                                                   >
         Estimates: {rows: 3 (30B), cpu: 51, memory: 0B, network: 0B}/{rows: 3 (30B), cpu: 51, memory: 0B, network: 0B}/{rows: 3 (30B), cpu: 30, memory: 0B, network: 0B}                                                >
         c := c:varchar:REGULAR                                                                                                                                                                                          >
         merge_col_0 := merge_col:varchar:REGULAR                                                                                                                                                                        >
                                                                                                                                                                                                                         >
                                                                                                                                                                                                                         >
(1 row)
marcinsbd commented 2 weeks ago

@abhinavdangi Can you provide the type of the column test_partition_table_1.part_col?

abhinavdangi commented 2 weeks ago

Had mentioned the table definitions above.

CREATE TABLE test_partition_table_1 (
   merge_col varchar,
   b varchar,
   part_col varchar
)
WITH (
   partitioned_by = ARRAY['part_col']
);
insert into test_partition_table_1 values ('spark','col1','20240201'),('trino','col2','20240202'), ('druid','col3','20240203');
marcinsbd commented 2 weeks ago

I did a test when partition column part_col is of the type integer instead of varchar as in example above so the cast is integer->bigint and it works.

trino:tpch> CREATE TABLE t1 (b varchar, merge_col integer, part_col integer) WITH (partitioned_by = ARRAY['part_col']);
CREATE TABLE
trino:tpch> INSERT INTO t1(b, merge_col, part_col)
         -> VALUES ('a', 1, 1),
         ->        ('b', 1, 2),
         ->        ('c', 1, 3),
         ->        ('d', 1, 4),
         ->        ('e', 2, 1),
         ->        ('f', 2, 2),
         ->        ('g', 2, 3),
         ->        ('h', 2, 4);
INSERT: 8 rows

trino:tpch> CREATE TABLE t2 (c varchar, merge_col integer);
CREATE TABLE
trino:tpch> INSERT INTO t2(c, merge_col)
         -> VALUES ('x', 1),
         ->        (null, 1),
         ->        ('y', 2),
         ->        ('Z', 3),
         ->        ('w', 4);
INSERT: 5 rows

trino:tpch> SELECT t1.b
         ->   FROM t1
         ->     LEFT JOIN t2  ON t1.merge_col = t2.merge_col
         ->   WHERE
         ->   t2.c IS NOT NULL and
         ->   cast(t1.part_col as BIGINT) BETWEEN 0 AND 1;
 b
---
 e
 a
(2 rows)

It seems to me that issue could be connected with way how we handle different casts. as it started failing with this commit https://github.com/trinodb/trino/commit/0f9adc635d9c605fd2e7777c192d985d3be28563 @martint , could you PTAL It seems as the cast varchar->bigint is marked as "may-fail" expression and it's not pushed down whereas cast integer->bigint is marked as safe and is pushed down. Should we provide different implementation of verification if partition column was used within the query? Thanks

martint commented 1 week ago

@marcinsbd, your analysis seems correct.

We're going to have to look into how to perform speculative pushdown of expressions that may fail. I have some very rough ideas on how to go about this. I'll post later.