apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.43k stars 3.69k forks source link

Fix RuntimeException during ordered scan MSQ query #16661

Open nozjkoitop opened 3 months ago

nozjkoitop commented 3 months ago

Resolved a runtime exception triggered by executing limited and ordered scan queries via the MSQ engine.

image

java.lang.RuntimeException: org.apache.druid.java.util.common.IOE: Encountered error while reading the output of stage [1], partition [1] for worker [0]
    at org.apache.druid.msq.exec.ControllerImpl.lambda$getFinalResultsYielder$18(ControllerImpl.java:1729)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at org.apache.druid.msq.exec.ControllerImpl.getFinalResultsYielder(ControllerImpl.java:1732)
    at org.apache.druid.msq.exec.ControllerImpl.runTask(ControllerImpl.java:437)
    at org.apache.druid.msq.exec.ControllerImpl.run(ControllerImpl.java:372)
    at org.apache.druid.msq.indexing.MSQControllerTask.runTask(MSQControllerTask.java:258)
    at org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478)
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:75)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.druid.java.util.common.IOE: Encountered error while reading the output of stage [1], partition [1] for worker [0]
    at org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory.openChannel(DurableStorageInputChannelFactory.java:132)
    at org.apache.druid.msq.indexing.InputChannelsImpl.openChannels(InputChannelsImpl.java:188)
    at org.apache.druid.msq.indexing.InputChannelsImpl.openUnsorted(InputChannelsImpl.java:158)
    at org.apache.druid.msq.indexing.InputChannelsImpl.openChannel(InputChannelsImpl.java:100)
    at org.apache.druid.msq.exec.ControllerImpl.lambda$getFinalResultsYielder$18(ControllerImpl.java:1720)
    ... 21 more
Caused by: org.apache.druid.java.util.common.ISE: Could not find remote outputs of stage [1] partition [1] for worker [0] at the path [query-results/controller_query-42c2151c-5655-4fc0-b7da-e5e30551f00f/stage_1/worker_0/taskId_query-42c2151c-5655-4fc0-b7da-e5e30551f00f-worker0_0/part_1]
    at org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory.openChannel(DurableStorageInputChannelFactory.java:113)
    ... 25 more

The main problem was that the system expected multiple partitions to be made, but in reality, only one partition was created by the OffsetLimitFrameProcessorFactory. During handling process caused by ShuffleSpec == null, expected partitions were counted based on workerInputs, which were created based on slicer from the stageTracker of previous stage.

When using MixShuffleSpec at the limit stage, it results in creating just one partition for the stage's outcomes, which seems to be the behavior we expect.

This PR has:

LakshSingla commented 3 months ago

Thanks a lot for the contribution 🎉 Curious if the change in the PR solves the bug. Also, is the result correct (i.e. there are 500 rows ingested after the fix)? I think that the patch still wouldn't be sufficient because the output partitions are 2444, and the input to the limit stage must only be a single partition. Can you please add a test case as well for the same?

Funny enough, there's also https://github.com/apache/druid/pull/16643 which solves a very closely related bug, but not quite the same. I am curious if that inadvertently resolves this one as well.

nozjkoitop commented 3 months ago

Thanks a lot for the contribution 🎉 Curious if the change in the PR solves the bug. Also, is the result correct (i.e. there are 500 rows ingested after the fix)? I think that the patch still wouldn't be sufficient because the output partitions are 2444, and the input to the limit stage must only be a single partition. Can you please add a test case as well for the same?

Funny enough, there's also #16643 which solves a very closely related bug, but not quite the same. I am curious if that inadvertently resolves this one as well.

It's fixes a bug, although I'm not sure why input to the limit stage should be a single partition if it merges it anyway, thanks for pointing out another PR I'm not sure how it'll gonna work, need to test that, as we'll be writing results to a multiple partitions but at the end of the day everything will be written to one file in the durable storage.

LakshSingla commented 3 months ago

multiple partitions but at the end of the day all will be written to one file in the durable storage

That shouldn't happen - output of a query like the following in durable storage should produce multiple partitions (which it isn't doing so at the moment)

SELECT * FROM foo LIMIT 1000000
nozjkoitop commented 3 months ago

That shouldn't happen - output of a query like the following in durable storage should produce multiple partitions (which it isn't doing so at the moment)

SELECT * FROM foo LIMIT 1000000

I was thinking about that, OffsetLimitFrameProcessorFactory should be updated in this case, now the output channel is hardcoded

LakshSingla commented 2 months ago

@nozjkoitop There was a more comprehensive change for similar issues with all the queries containing a LIMIT clause. It was merged recently https://github.com/apache/druid/pull/16643 and it should potentially fix the problem that you are seeing. Can you please verify if that patch fixes the issue?

nozjkoitop commented 2 months ago

Can you please verify if that patch fixes the issue?

Hi @LakshSingla, thanks for the suggestion, it actually does fix the problem, although WDYT about the scan stage which always results to 1 partition if it hasLimitOrOffset? It could have pretty big output partition

github-actions[bot] commented 2 weeks ago

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.