trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.43k stars 3k forks source link

A bunch of Trino Pinot queries are failing with io.grpc.StatusRuntimeException: UNKNOWN #19679

Open nizarhejazi opened 1 year ago

nizarhejazi commented 1 year ago

A bunch of Trino Pinot queries that are failing with io.grpc.StatusRuntimeException: UNKNOWN error but with different stack traces. Sample stack trace:

io.grpc.StatusRuntimeException: UNKNOWN
    at io.grpc.Status.asRuntimeException(Status.java:535)
    at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:648)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher$PinotGrpcServerQueryClient$ResponseIterator.computeNext(PinotGrpcDataFetcher.java:273)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher$PinotGrpcServerQueryClient$ResponseIterator.computeNext(PinotGrpcDataFetcher.java:260)
    at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
    at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher.endOfData(PinotGrpcDataFetcher.java:84)
    at io.trino.plugin.pinot.PinotSegmentPageSource.getNextPage(PinotSegmentPageSource.java:120)
    at io.trino.operator.ScanFilterAndProjectOperator$ConnectorPageSourceToPages.process(ScanFilterAndProjectOperator.java:386)
    at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:412)
    at io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:261)
    at io.trino.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
    ...
    at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:412)
    at io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:145)
    at io.trino.operator.Driver.processInternal(Driver.java:395)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:298)
    at io.trino.operator.Driver.tryWithLock(Driver.java:694)
    at io.trino.operator.Driver.process(Driver.java:290)
    at io.trino.operator.Driver.processForDuration(Driver.java:261)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:887)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:187)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:555)
    at io.trino.$gen.Trino_423_e_3____20231108_194630_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

Here are some sample queries:

The same queries are completing successfully in Presto. Here is my Pinot catalog config for reference:

  pinotcluster: |-
    connector.name=pinot
    pinot.controller-urls=https://[SOME_URL]:443
    pinot.proxy.enabled=true
    pinot.grpc.enabled=true
    pinot.grpc.port=443
    pinot.grpc.proxy-uri=[SOME_OTHER_URL]:443
    pinot.grpc.use-plain-text=false
    pinot.max-rows-for-broker-queries=100000000
    pinot.non-aggregate-limit-for-broker-queries=50000
    pinot.segments-per-split=100
    pinot.prefer-broker-queries=true
    pinot.aggregation-pushdown.enabled=true
    pinot.count-distinct-pushdown.enabled=true
elonazoulay commented 1 year ago

Hypothesis is that the order by may be pushed into the query, but 0.12.1 grpc does not support order by in the result set. Workaround: switch to "dynamic table":

SELECT company, user_cache, currentJob FROM "pinotcluster"."default"."SELECT company, user_cache, currentJob FROM role_with_company00005" rwc order by id desc limit 49000"

lmk if that works. I will take a look at the order by issue in the meantime and keep you updated.

elonazoulay commented 1 year ago

What is the stack trace for the IN QUERY example above? Does it work with the dynamic table approach above?

nizarhejazi commented 1 year ago

Hey @elonazoulay, here is the stack trace for the IN query:

io.grpc.StatusRuntimeException: UNKNOWN
    at io.grpc.Status.asRuntimeException(Status.java:535)
    at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:648)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher$PinotGrpcServerQueryClient$ResponseIterator.computeNext(PinotGrpcDataFetcher.java:273)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher$PinotGrpcServerQueryClient$ResponseIterator.computeNext(PinotGrpcDataFetcher.java:260)
    at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
    at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
    at io.trino.plugin.pinot.client.PinotGrpcDataFetcher.endOfData(PinotGrpcDataFetcher.java:84)
    at io.trino.plugin.pinot.PinotSegmentPageSource.getNextPage(PinotSegmentPageSource.java:120)
    at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:299)
    at io.trino.operator.Driver.processInternal(Driver.java:395)
    at io.trino.operator.Driver.lambda$process$8(Driver.java:298)
    at io.trino.operator.Driver.tryWithLock(Driver.java:694)
    at io.trino.operator.Driver.process(Driver.java:290)
    at io.trino.operator.Driver.processForDuration(Driver.java:261)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:887)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:187)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:555)
    at io.trino.$gen.Trino_423_e_3____20231108_194631_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

Workaround: switch to "dynamic table".

Unfortunately, we cannot easily switch to "dynamic tables". It will take us long time to move traffic from another federated engine to Trino and to change our translation layer that generates Trino SQL on the fly.

nizarhejazi commented 1 year ago

Can you elaborate more on why '0.12.1 grpc does not support order by in the result set'? Is this specific to Trino-Pinot-connector?

elonazoulay commented 1 year ago

I first need to verify that order by is being pushed down. Can you share a schema.json and tablespec.json? You can redact the column names, just need to reproduce.

elonazoulay commented 1 year ago

What is the amount of id's in the IN clause? What datatype is the id column? Is it a Pinot STRING datatype? Is it a single value or multi value column? Can you just share the schema snippet for that column?

nizarhejazi commented 1 year ago

@elonazoulay

Here is a minimal query: SELECT id, company FROM "pinotcluster1"."default"."role_with_company_object_history_record00011" WHERE id IN ('652...', '0a5...', 'b51...')

id and company are STRING datatypes.

Same for order by query that can be simplified to: SELECT id, company FROM "pinotcluster"."default"."role_with_company00005" rwc order by id desc limit 49000

elonazoulay commented 1 year ago

Just tried to repro: tried with IN clause and order by and could not repro. Could it be a server error on the pinot side? The order by may be pulling in all the rows from the server: can you check the pinot metrics to see # of rows processed, etc. (server metrics)? If you want to try different query shapes to verify this on your end, you can clone https://github.com/trinodb/trino/pull/17582 and use the PinotQueryRunner to test different query shapes. It uses Pinot 0.12.1 and has the tpch tables populated.

It does seem like the order by is causing the issue: pulling in all the rows. For that query, if you can switch to dynamic table it will greatly reduce the number of rows sent to trino, latency, resource usage on pinot servers, etc. Is it possible to evaluate this query on your end without changing the framework you mentioned?

SELECT id, company FROM "pinotcluster"."default"."SELECT id, company FROM role_with_company00005 order by id desc limit 49000";
elonazoulay commented 1 year ago

Idea for a long term solution: support topN pushdown in the pinot connector. Once Pinot 1.0.1 is released we can push it. I can start on that now so we're ready when the time comes (should be soon).

nizarhejazi commented 1 year ago

Hey @elonazoulay, thanks for taking a look. Few notes:

  1. I am using Starburst Enterprise version: 423.3.0 (Trino 423). I don't think SEP is the source of the issues here. Please note that all of these queries are completing successfully when run from Presto.
  2. Planning to upgrade to SEP 427 (Trino 427). Waiting also for a newer SEP release.
  3. Adding LIMIT explicitly to the query with IN clause allow the query to succeed. SEP is failing with StatusRuntimeException if the number of returned rows are beyond what defined in pinot.non-aggregate-limit-for-broker-queries and this should be fixed.
  4. For ORDER BY, The problem is not data size. I added more filters and double checked there is LIMIT and re-run queries. Here is a sample of successful and failed queries:

Successful Query: SELECT company FROM "pinotcluster"."default"."role_with_company" rwc WHERE company = 'company_id' limit 1000;

Query Plan:

Trino version: 423-e.3
Queued: 160.82us, Analysis: 3.30ms, Planning: 28.10ms, Execution: 387.95ms
Fragment 0 [SINGLE]
    CPU: 1.46ms, Scheduled: 1.50ms, Blocked 5.45s (Input: 5.13s, Output: 0.00ns), Input: 1000 rows (28.32kB); per task: avg.: 1000.00 std.dev.: 0.00, Output: 1000 rows (28.32kB)
    Output layout: [company]
    Output partitioning: SINGLE []
    Output[columnNames = [company]]
    │   Layout: [company:varchar]
    │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 1000 rows (28.32kB)
    │   Input avg.: 1000.00 rows, Input std.dev.: 0.00%
    └─ Limit[count = 1000]
       │   Layout: [company:varchar]
       │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 1000 rows (28.32kB)
       │   Input avg.: 1000.00 rows, Input std.dev.: 0.00%
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [company:varchar]
          │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 320.00ms (5.87%), Output: 1000 rows (28.32kB)
          │   Input avg.: 62.50 rows, Input std.dev.: 387.30%
          └─ RemoteSource[sourceFragmentIds = [1]]
                 Layout: [company:varchar]
                 CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 5.13s (94.13%), Output: 1000 rows (28.32kB)
                 Input avg.: 62.50 rows, Input std.dev.: 387.30%

Fragment 1 [SOURCE]
    CPU: 125.79ms, Scheduled: 318.92ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 1000 rows (28.32kB); per task: avg.: 1000.00 std.dev.: 0.00, Output: 1000 rows (28.32kB)
    Output layout: [company]
    Output partitioning: SINGLE []
    LimitPartial[count = 1000]
    │   Layout: [company:varchar]
    │   CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Blocked: 0.00ns (0.00%), Output: 1000 rows (28.32kB)
    │   Input avg.: 1000.00 rows, Input std.dev.: 0.00%
    └─ TableScan[table = pinotcluster:PinotTableHandle{schemaName=default, tableName=role_with_company, constraint={PinotColumnHandle{columnName=company, dataType=varchar, expression=company, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}=[ SortedRangeSet[type=varchar, ranges=1, {[company_id]}] ]}, limit=OptionalLong[1000], query=Optional.empty}]
           Layout: [company:varchar]
           CPU: 125.00ms (100.00%), Scheduled: 319.00ms (100.00%), Blocked: 0.00ns (0.00%), Output: 1000 rows (28.32kB)
           Input avg.: 1000.00 rows, Input std.dev.: 0.00%
           company := PinotColumnHandle{columnName=company, dataType=varchar, expression=company, aliased=false, aggregate=false, returnNullOnEmptyGroup=true, pushedDownAggregateFunctionName=Optional.empty, pushedDownAggregateFunctionArgument=Optional.empty}
           Input: 1000 rows (28.32kB), Physical input: 28.32kB, Physical input time: 313430000.00ns

Changing the query to the following results in io.grpc.StatusRuntimeException: UNKNOWN:

SELECT company 
FROM "pinotcluster"."default"."role_with_company" rwc 
WHERE company = 'company_id' order by id desc limit 1000;
  1. Simple join queries (with LIMIT enforced and a filtering condition) are failing with StatusRuntimeException. Sample query: SELECT rwc.company FROM role_with_company rwc LEFT JOIN job j ON rwc.currentJob = j.id WHERE rwc.company = 'company_id' LIMIT 1000;

  2. I shared pinot connector configuration above.

  3. Re: dynamic tables: thanks for the suggestion. Unfortunately, this does not work for us.

    • I don’t think we should expect Trino users to write most of their queries in a SQL dialect that differs in so many ways from Trino ANSI SQL. This in fact ends up decreasing the value that a federated query engine brings to the table.
    • Our queries are generated on the fly, and they are function-rich, with many joins, and we have a complex translation and optimization layer behind the scene.
    • The current translation layer generates Presto/Trino ANSI SQL and uses syntax (functions, operators, etc.) that is exclusive to Presto/Trino (e.g. lambda expressions, array functions and operators, etc.)
    • We employ many optimization tricks that are specific to Presto/Trino (including correlated query de-correlation, and dynamic filtering).
    • This is to say that using “dynamic tables” will require a huge refactor on our side and does not address the long-tail queries w/ edge cases problem.
    • Moreover, If the goal is to run Pinot-only queries using Pinot dialect of SQL to maximize performance, we will be running queries against Pinot directly. we have federated queries across (Pinot and Snowflake) and soon adding more connectors to the mix (DocumentDB/MongoDB, PostgreSQL) so we want to stick to Trino syntax for any federated query.
  4. Is not topN pushdown already supported by Presto pinot connector? Why do we need for 1.0.1 is release? Can we get it done (along with other critical pushdowns) in a shorter timeframe?

nizarhejazi commented 1 year ago

Here is some of the pushdowns that I think are missing from Trino-Pinto-connector:

nizarhejazi commented 12 months ago

@elonazoulay Many of our queries served by gRPC protocol are failing (still verifying whether all gRPC-based queries are failing). The error returned by Trino is cryptic and most likely a result from a failed communication between Trino and Pinot’s gRPC server.

Pinot’s gRPC server might be internal and not resolvable from external. In other words, even if SEP deployment was in the same VPC but in a different k8s cluster, Trino might not be able to communicate with Pinot using gRPC. The failure is most likely happening in responseIterator.next() in getNextDataTable() method.

Will try to address to ensure Trino can communicate successfully with Pinot’s gRPC server too in addition to HTTP server. Once I verify SEP is communicating w/ Pinot gRPC server, there might or might not be other gRPC-related issues.

elonazoulay commented 12 months ago

Thanks for the context @nizarhejazi ! For joins - do you use the Pinot v2 engine or still v1? lmk if I can help with the grpc stuff as well.

nizarhejazi commented 12 months ago

Hey @elonazoulay, for now we use Trino for joins (Pinot v1) but we start working on benchmarking non-federated queries against Pinot v2 engine directly. We will be supporting both modes, one for federated and one for non-federated queries.