apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.5k stars 1.02k forks source link

Adding Fetch Support to CoalesceBatchesExec #9792

Open berkaysynnada opened 3 months ago

berkaysynnada commented 3 months ago

Is your feature request related to a problem or challenge?

EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Filter: sink_table.c3 > Int16(0)
----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

The example query in repartition.slt waits until the target_batch_size of CoalesceBatchesExec fills. That causes a delay in the observation of the query result. We can push-down limit into CoalesceBatchesExec here.

Describe the solution you'd like

There exists a similar rule in logical planning. We can have a physical optimizer rule that pushes down the limit count until facing with some limit breaker operators (joins, windows, sorts). Once the limit hits a CoalesceBatchesExec before that, it can set a new requested batch size.

Describe alternatives you've considered

No response

Additional context

No response

Lordworms commented 3 months ago

I can do this one

alamb commented 3 months ago

Cross posting from https://github.com/apache/arrow-datafusion/pull/9815 as I am not sure about this proposal

It seems like adding a limit to CoalesceBatches seems like somewhat of a workaround for a limit in StreamingTableExec -- it seems like if we handled the limit in StreamingTableExec then

  1. It could be more efficient as the StreamingTableExeccould stop as soon as the limit was hit
  2. We would not need any changes to CoalesceBatches

EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
----
logical_plan
Limit: skip=0, fetch=5
--Filter: sink_table.c3 > Int16(0)
----TableScan: sink_table projection=[c1, c2, c3].     <--- Why not apply the LIMIT here?
physical_plan
GlobalLimitExec: skip=0, fetch=5
--CoalescePartitionsExec
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: c3@2 > 0
--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
----------StreamingTableExec: partition_sizes=1,      <---if the LIMIT was applied here there would be no need to apply it later?
berkaysynnada commented 3 months ago

Cross posting from #9815 as I am not sure about this proposal

It seems like adding a limit to CoalesceBatches seems like somewhat of a workaround for a limit in StreamingTableExec -- it seems like if we handled the limit in StreamingTableExec then

  1. It could be more efficient as the StreamingTableExeccould stop as soon as the limit was hit
  2. We would not need any changes to CoalesceBatches

We can of course inform the StreamingTableExec about how many rows it needs to read for a Limit query. It enhances our position but still CoalesceBatches may need a fetch value. CoalesceBatches fetch count is directly related with the Limit operator above it. However, StreamingTableExec have to read more value than that limit fetch count. Am I missing something?

Limit: fetch=5 --CoalesceBatches: target_size=1000 ----Aggregate: to produce 5 rows, needs 500 rows ------StreamingTableExec

Assuming the plan above, I think CoalesceBatches must know fetch count.

alamb commented 3 months ago

What I was suggesting is that

Limit: fetch=5
--CoalesceBatches: target_size=1000
----Aggregate: to produce 5 rows, needs 500 rows
------StreamingTableExec fetch=5.     <--- if there was a fetch=5 here, there is no need to limit in CoalesceBatches
                                           as it would see end of stream after `StreamingTableExec` is done
berkaysynnada commented 3 months ago

<--- if there was a fetch=5 here, there is no need to limit in CoalesceBatches as it would see end of stream after StreamingTableExec is done

Aggregate would need 500 rows to produce 5 rows, but we don't know that until 501st row comes to the Aggregate. So, we cannot limit the StreamingTableExec here during planning, but limit the CoalesceBatches.

Lordworms commented 3 months ago

Yeah, in my implementation process, I was planning to directly pass fetch to StreamTableExec but found out the number should pass via CoalesceBatchesExec.

alamb commented 3 months ago

I see -- thank you @berkaysynnada https://github.com/apache/arrow-datafusion/issues/9792#issuecomment-2037425563 makes sense

Something still feels a little off with limiting in CoalesceBatches as it seems it would always be better to do the fetch below that ExecutionPlan

For example, in this plan it seems like it would be best to have the Aggregate stop after 5 rows:

Limit: fetch=5
--CoalesceBatches: target_size=1000
----Aggregate: to produce 5 rows, needs 500 rows <--- should stop after it has created 5 rows. 
------StreamingTableExec

This looks like there is something similar: https://github.com/apache/arrow-datafusion/blob/63888e853b7b094f2f47f53192a94f38327f5f5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L272-L276

Lordworms commented 3 months ago

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

Dandandan commented 3 months ago

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

Even better would be if every operator accept fetch, like @alamb suggests for Aggregate.

I wonder for the purpose of this ticket, we can also put limit below CoalesceBatchesExec instead of after.

CoalesceBatches: target_size=1000
--Limit fetch=5
----Aggregate:
------StreamingTableExec
berkaysynnada commented 2 months ago

so what should be a better design instead of passing fetch via CoalesceBatchesExec?

Even better would be if every operator accept fetch, like @alamb suggests for Aggregate.

If every operators accept fetch, I guess there will be no need for LimitExec's at the final plan. It may get plans more complicated. There should be a few operator affected by internal fetching mechanism, and maybe adding them that support could be more straightforward.

alamb commented 2 months ago

If every operators accept fetch, I guess there will be no need for LimitExec's at the final plan. It may get plans more complicated. There should be a few operator affected by internal fetching mechanism, and maybe adding them that support could be more straightforward.

well said: I think this is exactly the tradeoff