apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.67k stars 1.06k forks source link

Internal error: Invalid HashJoinExec partition count mismatch 1!=2 when constructing merge plan with 1 CPU #9928

Closed echai58 closed 3 months ago

echai58 commented 3 months ago

Describe the bug

In delta-rs, when constructing a merge plan via Datafusion, we've seen the following error when running in an environment with only 1 CPU. After increasing to 2 CPUs, the issue is resolved. (https://github.com/delta-io/delta-rs/issues/2188) Internal error: Invalid HashJoinExec partition count mismatch 1!=2

Why is this the case?

To Reproduce

No response

Expected behavior

No response

Additional context

No response

alamb commented 3 months ago

Thanks for the report @echai58 -- this certainly looks like a bug in DataFusion

I think the next step for this issue would be to create a self contained (datafusion only) reproducer

mustafasrepo commented 3 months ago

@echai58 can you post the PhysicalPlan for the failing test, if possible. It might help to produce datafusion only reproducer

echai58 commented 3 months ago

@alamb @mustafasrepo here's the output when running with RUST_LOG=DEBUG for the physical plan. This is a delta table with columns ticker, valuation_date, value, _month, merging on source.ticker = target.ticker (simplified the merge predicate for simplicity of the plan)

[2024-04-05T18:00:00Z DEBUG datafusion::physical_planner] Input physical plan:
    ProjectionExec: expr=[__delta_rs_c_ticker@0 as ticker, __delta_rs_c_valuation_date@1 as valuation_date, __delta_rs_c_value@2 as value, __delta_rs_c__month@3 as _month]
      FilterExec: __delta_rs_delete@4 IS NOT DISTINCT FROM false
        ProjectionExec: expr=[__delta_rs_c_ticker@12 as __delta_rs_c_ticker, __delta_rs_c_valuation_date@13 as __delta_rs_c_valuation_date, __delta_rs_c_value@14 as __delta_rs_c_value, __delta_rs_c__month@15 as __delta_rs_c__month, __delta_rs_delete@16 as __delta_rs_delete]
          MetricObserverExec id=merge_output_count
            MergeBarrier
              ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as __delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date, value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path, __delta_rs_target@10 as __delta_rs_target, __delta_rs_operation@11 as __delta_rs_operation, CASE __delta_rs_operation@11 WHEN 0 THEN ticker@0 WHEN 1 THEN ticker@0 WHEN 2 THEN CAST(ticker@5 AS LargeUtf8) WHEN 3 THEN CAST(ticker@5 AS LargeUtf8) WHEN 4 THEN CAST(ticker@5 AS LargeUtf8) END as __delta_rs_c_ticker, CASE __delta_rs_operation@11 WHEN 0 THEN valuation_date@1 WHEN 1 THEN valuation_date@1 WHEN 2 THEN valuation_date@6 WHEN 3 THEN valuation_date@6 WHEN 4 THEN valuation_date@6 END as __delta_rs_c_valuation_date, CASE __delta_rs_operation@11 WHEN 0 THEN value@2 WHEN 1 THEN value@2 WHEN 2 THEN value@7 WHEN 3 THEN value@7 WHEN 4 THEN value@7 END as __delta_rs_c_value, CASE __delta_rs_operation@11 WHEN 0 THEN _month@3 WHEN 1 THEN _month@3 WHEN 2 THEN CAST(_month@8 AS LargeUtf8) WHEN 3 THEN CAST(_month@8 AS LargeUtf8) WHEN 4 THEN CAST(_month@8 AS LargeUtf8) END as __delta_rs_c__month, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN true WHEN 4 THEN false ELSE false END as __delta_rs_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN NULL WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_insert, CASE __delta_rs_operation@11 WHEN 0 THEN NULL WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_update, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN NULL WHEN 3 THEN false WHEN 4 THEN NULL ELSE false END as __delta_rs_target_copy]
                ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as __delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date, value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path, __delta_rs_target@10 as __delta_rs_target, CASE WHEN (__delta_rs_source@4 IS NOT DISTINCT FROM true) AND (__delta_rs_target@10 IS NOT DISTINCT FROM true) THEN 0 WHEN (__delta_rs_source@4 IS NOT DISTINCT FROM true) AND __delta_rs_target@10 IS NULL THEN 1 WHEN (__delta_rs_source@4 IS NOT DISTINCT FROM true) AND (__delta_rs_target@10 IS NOT DISTINCT FROM true) THEN 2 WHEN (__delta_rs_source@4 IS NOT DISTINCT FROM true) AND __delta_rs_target@10 IS NULL THEN 3 WHEN __delta_rs_source@4 IS NULL AND __delta_rs_target@10 IS NOT DISTINCT FROM true THEN 4 END as __delta_rs_operation]
                  ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as __delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date, value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path, __delta_rs_target@10 as __delta_rs_target]
                    HashJoinExec: mode=CollectLeft, join_type=Full, on=[(ticker@0, CAST(t.ticker AS LargeUtf8)@6)]
                      ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, true as __delta_rs_source]
                        MetricObserverExec id=merge_source_count
                          ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month]
                            MemoryExec: partitions=1, partition_sizes=[1]
                      ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_path@4 as __delta_rs_path, __delta_rs_target@5 as __delta_rs_target, CAST(ticker@0 AS LargeUtf8) as CAST(t.ticker AS LargeUtf8)]
                        ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_path@4 as __delta_rs_path, true as __delta_rs_target]
                          MetricObserverExec id=merge_target_count
                            DeltaScan
                              ParquetExec: file_groups={146 groups: [[_month=2018-08/part-00001-2196a828-b7ac-4406-8b21-63edd7072c0e-c000.snappy.parquet], [_month=2011-10/part-00001-08fb1ac4-3fc0-43e1-af5f-14765704e60c-c000.snappy.parquet], [_month=2018-10/part-00001-50d537b7-b59b-415e-a13d-176526552b52-c000.snappy.parquet], [_month=2011-05/part-00001-2bcb1fa9-c396-4c38-901b-d73cf5443f28-c000.snappy.parquet], [_month=2016-03/part-00001-5a4f5659-43ee-4c90-931c-0dd0d15d9e4e-c000.snappy.parquet], ...]}, projection=[ticker, valuation_date, value, _month, __delta_rs_path]

[2024-04-05T18:00:00Z DEBUG datafusion::physical_planner] Optimized physical plan:
    ProjectionExec: expr=[__delta_rs_c_ticker@0 as ticker, __delta_rs_c_valuation_date@1 as valuation_date, __delta_rs_c_value@2 as value, __delta_rs_c__month@3 as _month]
      CoalesceBatchesExec: target_batch_size=8192
        FilterExec: __delta_rs_delete@4 IS NOT DISTINCT FROM false
          ProjectionExec: expr=[__delta_rs_c_ticker@12 as __delta_rs_c_ticker, __delta_rs_c_valuation_date@13 as __delta_rs_c_valuation_date, __delta_rs_c_value@14 as __delta_rs_c_value, __delta_rs_c__month@15 as __delta_rs_c__month, __delta_rs_delete@16 as __delta_rs_delete]
            MetricObserverExec id=merge_output_count
              MergeBarrier
                ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as __delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date, value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path, __delta_rs_target@10 as __delta_rs_target, __delta_rs_operation@11 as __delta_rs_operation, CASE __delta_rs_operation@11 WHEN 0 THEN ticker@0 WHEN 1 THEN ticker@0 WHEN 2 THEN CAST(ticker@5 AS LargeUtf8) WHEN 3 THEN CAST(ticker@5 AS LargeUtf8) WHEN 4 THEN CAST(ticker@5 AS LargeUtf8) END as __delta_rs_c_ticker, CASE __delta_rs_operation@11 WHEN 0 THEN valuation_date@1 WHEN 1 THEN valuation_date@1 WHEN 2 THEN valuation_date@6 WHEN 3 THEN valuation_date@6 WHEN 4 THEN valuation_date@6 END as __delta_rs_c_valuation_date, CASE __delta_rs_operation@11 WHEN 0 THEN value@2 WHEN 1 THEN value@2 WHEN 2 THEN value@7 WHEN 3 THEN value@7 WHEN 4 THEN value@7 END as __delta_rs_c_value, CASE __delta_rs_operation@11 WHEN 0 THEN _month@3 WHEN 1 THEN _month@3 WHEN 2 THEN CAST(_month@8 AS LargeUtf8) WHEN 3 THEN CAST(_month@8 AS LargeUtf8) WHEN 4 THEN CAST(_month@8 AS LargeUtf8) END as __delta_rs_c__month, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN true WHEN 4 THEN false ELSE false END as __delta_rs_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN NULL WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_insert, CASE __delta_rs_operation@11 WHEN 0 THEN NULL WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_update, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN NULL WHEN 3 THEN false WHEN 4 THEN NULL ELSE false END as __delta_rs_target_copy]
                  ProjectionExec: expr=[ticker@7 as ticker, valuation_date@8 as valuation_date, value@9 as value, _month@10 as _month, __delta_rs_source@11 as __delta_rs_source, ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_path@4 as __delta_rs_path, __delta_rs_target@5 as __delta_rs_target, CASE WHEN (__delta_rs_source@11 IS NOT DISTINCT FROM true) AND (__delta_rs_target@5 IS NOT DISTINCT FROM true) THEN 0 WHEN (__delta_rs_source@11 IS NOT DISTINCT FROM true) AND __delta_rs_target@5 IS NULL THEN 1 WHEN (__delta_rs_source@11 IS NOT DISTINCT FROM true) AND (__delta_rs_target@5 IS NOT DISTINCT FROM true) THEN 2 WHEN (__delta_rs_source@11 IS NOT DISTINCT FROM true) AND __delta_rs_target@5 IS NULL THEN 3 WHEN __delta_rs_source@11 IS NULL AND __delta_rs_target@5 IS NOT DISTINCT FROM true THEN 4 END as __delta_rs_operation]
                    CoalesceBatchesExec: target_batch_size=8192
                      HashJoinExec: mode=Partitioned, join_type=Full, on=[(CAST(t.ticker AS LargeUtf8)@6, ticker@0)]
                        ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_path@4 as __delta_rs_path, true as __delta_rs_target, CAST(ticker@0 AS LargeUtf8) as CAST(t.ticker AS LargeUtf8)]
                          MetricObserverExec id=merge_target_count
                            ParquetExec: file_groups={146 groups: [[_month=2018-08/part-00001-2196a828-b7ac-4406-8b21-63edd7072c0e-c000.snappy.parquet], [_month=2011-10/part-00001-08fb1ac4-3fc0-43e1-af5f-14765704e60c-c000.snappy.parquet], [_month=2018-10/part-00001-50d537b7-b59b-415e-a13d-176526552b52-c000.snappy.parquet], [_month=2011-05/part-00001-2bcb1fa9-c396-4c38-901b-d73cf5443f28-c000.snappy.parquet], [_month=2016-03/part-00001-5a4f5659-43ee-4c90-931c-0dd0d15d9e4e-c000.snappy.parquet], ...]}, projection=[ticker, valuation_date, value, _month, __delta_rs_path]
                        ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, true as __delta_rs_source]
                          MetricObserverExec id=merge_source_count
                            MemoryExec: partitions=1, partition_sizes=[1]

Let me know if there is any other log output that would be helpful, thanks!

korowa commented 3 months ago

According to input plan, and how the join is planned

HashJoinExec: mode=>>>CollectLeft<<<, join_type=Full

target partitions value is 1 (single core, works as expected, ok), since CollectLeft planned by default for single partition, otherwise it would be Auto/Partitioned.

In the same time, ParquetExec is planned as if there are 146 partitions

ParquetExec: file_groups={146 groups: [[...]]}

normally datafusion plans 1 file group per target partition -- so this plan is already a bit inconsistent, as ParquetExec planned through delta-rs doesn't seem to respect partition limitations (if I'm not mistaken).

After initial planning there is optimization phase -- currently DF has some issues with FULL (and some other) joins in CollectLeft mode (should be fixed by #9757), so join_selection optimizer rule converts CollectLeft to Partitioned, and after that enforce_distribution normally adds RepartitionExec operators, but it doesn't happen for single partition (since single partition satisfies hash distribution, and repartitioning N -> 1 won't increase execution parallelism) -- there are no repartitions in result plan, and a a result we have Partitioned HashJoinExec with 1 and 146 partitions input, which is an incorrect operator configuration.

In the same time increasing number of target partitions allows enforce_distribution to add RepartitionExec-s, (since scan distribution is unknown and DF needs to enforce hash partitioning) and result plan is fine.

To sum up: 1) Looks like DeltaScan -> ParquetExec planning is not in delta-rs is not aligned with how DF plans file scans -- according to this function DeltaScan builder creates ParquetExec with file_groups, grouped by partition_column values, and doesn't take into account target_partitions config, while DF listing table normally splits all files into N chunks where N = config.target_partitions 2) Maybe ParquetExec/other format scans physical plan creation function should check if passed file_groups don't violate target_partitions config parameter -- thread overcommitting, at first glance, looks unacceptable, but I'm not 100% sure about it.

Any other ideas and comments are welcome.

mustafasrepo commented 3 months ago

Thanks @korowa and @echai58 for detailed analysis. Sorry for the late reply. I will try to generate datafusion only reproducer for this use case with your findings. Then we can try to fix the problem there. The root cause of the problem seems to stem from we allow to insert data larger than the target partition from the source. This seems to cause some inconsistency internally. By adding a RepartitionExec immediately after the source to bring the source desired partitioning might solve the issue. I am not sure though, will post my findings as I progress.

mustafasrepo commented 3 months ago

I could generate a datafusion only reproducer. The following query triggers same error when target partition is 1.

SELECT * FROM (
    SELECT 1 as c, 2 as d
    UNION ALL
    SELECT 1 as c, 3 AS d
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;

To set target partition to 1, following command can be used:

statement ok
set datafusion.execution.target_partitions = 1;

I understood the problem. It seems that while satisfying hash requirement we do not add hash repartition when target partition is 1 (assuming source has single partition, hence Repartatition(n_inout=1, n_output=1) is unnecessary). However, source may not have always single partition, as in the example. Will file a fix shortly for this bug.