trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.45k stars 3.01k forks source link

Druid `__time` filter is not pushed down #8404

Closed jerryleooo closed 2 years ago

jerryleooo commented 3 years ago

In Druid queries, __time is a very important filter since applying it narrows down the scan range, otherwise, Druid will do a full table scan, which is very slow and resource-consuming.

Currently, I found the __time is not pushed down, not sure if I use it wrongly or the pushdown is not supported yet:

Trino SQL: select sum(col1) from <table name> where __time between date'2021-06-01' and date'2021-06-30';

SQLs sent to Druid: SELECT "__time", "col1" FROM "druid"."<table name>"

I noticed there are ongoing works (#4109 or #4313) of implementing Druid aggregation pushdown, not sure if this is part work of it

jerryleooo commented 3 years ago

The result of "explain":

explain select sum(col1) from <table name> where __time between timestamp'2021-06-01' and timestamp'2021-06-30';

Fragment 0 [SINGLE]
    Output layout: [sum]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Output[_col0]
    │   Layout: [sum:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    │   _col0 := sum
    └─ Aggregate(FINAL)
       │   Layout: [sum:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
       │   sum := sum("sum_0")
       └─ LocalExchange[SINGLE] ()
          │   Layout: [sum_0:row(bigint, boolean, bigint, boolean)]
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
          └─ RemoteSource[1]
                 Layout: [sum_0:row(bigint, boolean, bigint, boolean)]

Fragment 1 [SOURCE]
    Output layout: [sum_0]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Aggregate(PARTIAL)
    │   Layout: [sum_0:row(bigint, boolean, bigint, boolean)]
    │   sum_0 := sum("col1")
    └─ ScanFilterProject[table = druid:druid.<table name> druid.druid.<table name> columns=[__time:timestamp(3):TIMESTAMP, col1:bigint:BIGINT], grouped = false, filterPredicate = ("__time" BETWEEN TIMESTAMP '2021-06-01 00:00:00.000' AND TIMESTAMP '2021-06-30 00:00:00.000')]
           Layout: [col1:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           __time := __time:timestamp(3):TIMESTAMP
           col1 := col1:bigint:BIGINT

seems the filter is pushed down, but from what we observed, the scan rows are still full table, not sure if there is issue in buildSql

sumannewton commented 3 years ago

Looks like only varchar types are handled in the druid connector. https://github.com/trinodb/trino/blob/60f66d8f2c8cd440b0a301d1e988213254f7dc8d/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java#L140-L153

I am not an expert in Druid. @findepi @martint Please take a look.

hashhar commented 3 years ago

@sumannewton Correct.

There are no explicit type mappings defined for the write path. toWriteMapping so no pushdown is possible. There's a reason why legacyToWriteMapping and legacyColumnMapping are deprecated.

There should be type mappings defined explicitly for data-types in both the read (toColumnMapping) and write (toWriteMapping) paths.

jerryleooo commented 3 years ago

Hi @hashhar so is there any plan to improve this part?

hashhar commented 3 years ago

@jerryleooo i don't know of anyone actively working on this at the moment but we can guide anyone who wants to work on this through the changes.

Looking at the mentioned methods in the PostgreSQL connector should be all that's needed.

cc: @dheerajkulakarni thought you might be interested.

jerryleooo commented 3 years ago

@hashhar understand and we have the interest to improve this part, will see the methods you mentioned. tks

jerryleooo commented 3 years ago

Hi @hashhar @sumannewton I am trying to fix this as you guided: https://github.com/trinodb/trino/pull/8474 But there is some error. In https://github.com/trinodb/trino/blob/c647c317405565ebd84599953dbf3cfca42affec/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/StandardColumnMappings.java#L499 I found AvaticaResultSet will raise error when getObject(columnIndex, LocalDateTime.class), I paste the full error log here:

2021-07-06T02:28:34.895-0500 INFO TIMELINE: Query 20210706_072834_00243_h68ar :: Transaction:[4f5bdbb7-41c5-4b56-aa2c-47fe5593812c] :: elapsed 157ms :: planning 8ms :: waiting 103ms :: scheduling 105ms :: running 42ms :: finishing 2ms :: begin 2021-07-06T02:28:34.735-05:00 :: end 2021-07-06T02:28:34.892-05:00
2021-07-06T02:28:35.071-0500 INFO SELECT "__time", "clerk", "comment", "custkey", "orderdate", "orderkey", "orderpriority", "orderstatus", "shippriority", "totalprice" FROM "druid"."orders"
2021-07-06T02:28:35.274-0500 SEVERE Error processing Split 20210706_072834_00244_h68ar.1.0-0 io.trino.plugin.jdbc.JdbcSplit@29ef096 (start = 1.37692158049932E8, wall = 215 ms, cpu = 0 ms, wait = 0 ms, calls = 1): JDBC_ERROR: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
    at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
    at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
    at io.trino.operator.Driver.processInternal(Driver.java:387)
    at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
    at io.trino.operator.Driver.tryWithLock(Driver.java:683)
    at io.trino.operator.Driver.processFor(Driver.java:284)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
    at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
    at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
    ... 13 more

2021-07-06T02:28:35.283-0500 SEVERE Stage 20210706_072834_00244_h68ar.1 failed
io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
    at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
    at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
    at io.trino.operator.Driver.processInternal(Driver.java:387)
    at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
    at io.trino.operator.Driver.tryWithLock(Driver.java:683)
    at io.trino.operator.Driver.processFor(Driver.java:284)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
    at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
    at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
    ... 13 more

2021-07-06T02:28:35.288-0500 INFO FlakyTestRetryAnalyzer not enabled: CONTINUOUS_INTEGRATION environment is not detected or system property 'io.trino.testng.services.FlakyTestRetryAnalyzer.enabled' is not set to 'true' (actual: <not set>)
2021-07-06T02:28:35.308-0500 INFO TIMELINE: Query 20210706_072834_00244_h68ar :: Transaction:[afbebe63-cdc4-4031-b24b-a3655037a957] :: elapsed 385ms :: planning 11ms :: waiting 95ms :: scheduling 148ms :: running 216ms :: finishing 10ms :: begin 2021-07-06T02:28:34.899-05:00 :: end 2021-07-06T02:28:35.284-05:00

java.lang.RuntimeException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)

    at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:120)
    at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:476)
    at io.trino.testing.AbstractTestQueryFramework.computeActual(AbstractTestQueryFramework.java:137)
    at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:363)
    at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:354)
    at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:349)
    at io.trino.testing.BaseConnectorTest.testExplainAnalyze(BaseConnectorTest.java:523)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
    at org.testng.TestRunner.privateRun(TestRunner.java:756)
    at org.testng.TestRunner.run(TestRunner.java:610)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:387)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:382)
    at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
    at org.testng.SuiteRunner.run(SuiteRunner.java:289)
    at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
    at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
    at org.testng.TestNG.runSuitesSequentially(TestNG.java:1293)
    at org.testng.TestNG.runSuitesLocally(TestNG.java:1218)
    at org.testng.TestNG.runSuites(TestNG.java:1133)
    at org.testng.TestNG.run(TestNG.java:1104)
    at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
    at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
    Suppressed: java.lang.Exception: SQL: EXPLAIN ANALYZE SELECT * FROM orders
        at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:479)
        ... 29 more
Caused by: io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
    at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
    at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
    at io.trino.operator.Driver.processInternal(Driver.java:387)
    at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
    at io.trino.operator.Driver.tryWithLock(Driver.java:683)
    at io.trino.operator.Driver.processFor(Driver.java:284)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
    at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
    at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
    at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
    at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
    ... 13 more

I guess we may need to report this to Avatica or have a workaround here.

jerryleooo commented 3 years ago

Seems https://issues.apache.org/jira/browse/CALCITE-1630 is related

hashhar commented 3 years ago

Seems https://issues.apache.org/jira/browse/CALCITE-1630 is related

Take a look at StandardColumnMappings#timestampWriteFunctionUsingSqlTimestamp and StandardColumnMappings#timeColumnMappingUsingSqlTime. They use the java.sql types instead and hence don't work correctly during DST gaps though. So please also add a TODO in the code about https://issues.apache.org/jira/browse/CALCITE-1630

jerryleooo commented 3 years ago

Thanks @hashhar, timestampColumnMappingUsingSqlTimestampWithRounding works and I will update the code later.

Since Druid doesn't support createTable and addColumn, I guess we may not need a toWriteMapping?

hashhar commented 3 years ago

The toWriteMapping is still used to generate expressions and statement used when pushing down queries. It's not a "INSERT" mapping. It's about how to write Trino types to Druid types.

findepi commented 3 years ago

Lack of pushdown is because Druid column mapping was never implemented yet:

https://github.com/trinodb/trino/blob/825e6d82cfd3dee98038f14b38a55e0d1798ae35/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java#L152

hence we end up calling the timestampColumnMappingUsingSqlTimestampWithRounding which disables pushdown for correctness reasons (because of rounding on read).

We should impl proper type mapping for Druid. See other connectors extending from BaseJdbcClient for reference. and see TestPostgreSqlTypeMapping for the test class to follow

jerryleooo commented 3 years ago

Thanks, @findepi , I am working on this as you suggested.

jerryleooo commented 3 years ago

The problem is, inside TestXXXTypeMapping, we need to use DataTypeTest or SqlDataTypeTest, which rely on TestTable, in whose constructor CREATE TABLE and INSERT DATA will be run -- this is not suitable for Druid.

I guess I may need to implement DruidDataTypeTest, DruidTestTable, DruidCreateAndInsertDataSetup, but not sure if I am too far away. Any ideas? @findepi @hashhar

jibinpt commented 3 years ago
            case Types.TIMESTAMP:
                TimestampType timestampType = createTimestampType(3);
                return Optional.of(timestampColumnMapping(timestampType));

@findepi Adding above code for type mapping similar to other JDBC clients resulted in the following error:

trino:druid> explain analyze select __time from orders where orders.__time >= date '1995-01-01';

Query 20210713_192755_00001_7ib4c, FAILED, 1 node
Splits: 18 total, 0 done (0.00%)
3.07 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20210713_192755_00001_7ib4c failed: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type `java.time.LocalDateTime` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling (through reference chain: org.apache.calcite.avatica.remote.Service$ExecuteRequest["parameterValues"]->java.util.Arrays$ArrayList[0]->org.apache.calcite.avatica.remote.TypedValue["value"])

@jerryleooo did you face similar issue?

jerryleooo commented 3 years ago

@jibinpt can see my latest commit

findepi commented 3 years ago

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time typejava.time.LocalDateTimenot supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"

@jibinpt did you try to add that module?

jibinpt commented 3 years ago

@findepi error message say (through reference chain: org.apache.calcite.avatica.remote.Service$ExecuteRequest["parameterValues"]->java.util.Arrays$ArrayList[0]->org.apache.calcite.avatica.remote.TypedValue["value"]). So won't this be happening at Druid side and including this module in Trino pom.xml won't help right?

maudrid commented 2 years ago

This fix is sorely needed! I compiled the current state of this branch but I had to do a number of cosmetic fixes to get this to work. I will fork the project and do a pull for those.

maudrid commented 2 years ago

Created the pull request for @jerryleooo here: https://github.com/jerryleooo/trino/pull/2

Praveen2112 commented 2 years ago

https://github.com/trinodb/trino/pull/13335 fixes this issue.