datafusion-contrib / datafusion-federation

Allow DataFusion to resolve queries across remote query engines while pushing down as much compute as possible down.
Apache License 2.0
77 stars 18 forks source link

Preserve records batch order when SchemaCastScanExec is involved #70

Closed phillipleblanc closed 1 month ago

phillipleblanc commented 1 month ago

🗣 Description

EnforceDistribution PhisicalOptimizerRule aggressively enforces inputs to be split and processed in parallel which leads to re-ordering of record batches containing few records.

benefits_from_input_partitioning setting enforces 1-1 relationship between SchemaCastScanExec and its input preventing re-ordering. This does not block partitioning/parallel processing in general, just ensures 1-1 mapping for SchemaCastScanExec and wrapped input.

Example plan. RepartitionExec is automatically enforced by the optimization. Order of resultant records batches vary as query records are processed in parallel.

+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |   Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |  Projection: CAST(sum(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |   Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |     Filter: part.p_partkey = lineitem.l_partkey AND part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") AND CAST(lineitem.l_quantity AS Decimal128(30, 15)) < (<subquery>)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       Subquery:                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |         Projection: CAST(Float64(0.2) * CAST(avg(lineitem.l_quantity) AS Float64) AS Decimal128(30, 15))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |           Aggregate: groupBy=[[]], aggr=[[avg(lineitem.l_quantity)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |             Filter: lineitem.l_partkey = outer_ref(part.p_partkey)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |               TableScan: lineitem                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |       CrossJoin:                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |         TableScan: lineitem                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         TableScan: part                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| physical_plan | BytesProcessedExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |   SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |     RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |       VirtualExecutionPlan name=duckdb compute_context=/Users/sg/spice/spiceai/crates/runtime/.spice/data/accelerated_duckdb.db sql=SELECT (CAST(sum(lineitem.l_extendedprice) AS DOUBLE) / CAST(7 AS DOUBLE)) AS avg_yearly FROM lineitem JOIN "part" ON true WHERE (((("part".p_partkey = lineitem.l_partkey) AND ("part".p_brand = 'Brand#23')) AND ("part".p_container = 'MED BOX')) AND (CAST(lineitem.l_quantity AS DECIMAL(30,15)) < (SELECT CAST((CAST(0.2 AS DOUBLE) * CAST(avg(lineitem.l_quantity) AS DOUBLE)) AS DECIMAL(30,15)) FROM lineitem WHERE (lineitem.l_partkey = "part".p_partkey)))) rewritten_sql=SELECT (CAST(sum(lineitem.l_extendedprice) AS DOUBLE) / CAST(7 AS DOUBLE)) AS avg_yearly FROM lineitem JOIN "part" ON true WHERE (((("part".p_partkey = lineitem.l_partkey) AND ("part".p_brand = 'Brand#23')) AND ("part".p_container = 'MED BOX')) AND (CAST(lineitem.l_quantity AS DECIMAL(30,15)) < (SELECT CAST((CAST(0.2 AS DOUBLE) * CAST(avg(lineitem.l_quantity) AS DOUBLE)) AS DECIMAL(30,15)) FROM lineitem WHERE (lineitem.l_partkey = "part".p_partkey)))) |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

🤔 Other approaches considered

  1. Specifying ordering in output_partitioning for VirtualExecutionPlan to indicate that output is ordered - same behavior
  2. Using CoalescePartitionsExec but it does not guarantee ordering
  3. Tried [prefer_existing_sort](https://docs.rs/datafusion/latest/datafusion/config/struct.OptimizerOptions.html#structfield.prefer_existing_sort) and some other datafusion configuration options - same behavior
  4. Verified that all execution plans involved in query correctly specify single output partition. Specified configuration is then updated during execution by EnforceDistribution optimizer (execution plan is updated by injecting RepartitionExec and re-initializing SchemaCastScanExec)