apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.33k stars 1.2k forks source link

Regression: `Invalid comparison operation: Utf8 == Utf8View` error during LEFT ANTI JOIN #13510

Open sergiimk opened 1 day ago

sergiimk commented 1 day ago

Describe the bug

Between 42.2.0 and 43.0.0 there appears to have been a regression that introduced an error:

External(ArrowError(InvalidArgumentError("Invalid comparison operation: Utf8 == Utf8View"), None))

Note that the error happens at the plan execution phase, i.e. plan validation passes successfully.

As Arrow / DF errors don't have any trace info I have no visibility on which operator this error originates from. I will proceed to patch Arrow to try to dump the traceback and produce a minimal reproduce steps.

To Reproduce

I'm still struggling to isolate a minimal repro case. Can only share the full optimized physical plan for now:

DataSinkExec: sink=ParquetSink(file_groups=[])
  CoalescePartitionsExec
    ProjectionExec: expr=[CAST(row_number() ORDER BY [date ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 AS Int64) + 2 as offset, op@0 as op, system_time@4 as system_time, date@1 as date, city@2 as city, population@3 as population]
      RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
        BoundedWindowAggExec: wdw=[row_number() ORDER BY [date ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [date ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[Sorted]
          SortPreservingMergeExec: [date@1 ASC]
            SortExec: expr=[date@1 ASC], preserve_partitioning=[true]
              ProjectionExec: expr=[0 as op, CASE WHEN date@0 IS NULL THEN 2524651200000 ELSE date@0 END as date, city@1 as city, population@2 as population, 2524651200000 as system_time]
                CoalesceBatchesExec: target_batch_size=8192
                  HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(date@0, date@0), (city@1, city@1)]
                    CoalesceBatchesExec: target_batch_size=8192
                      RepartitionExec: partitioning=Hash([date@0, city@1], 16), input_partitions=16
                        ProjectionExec: expr=[CAST(date@0 AS Timestamp(Millisecond, Some("UTC"))) as date, city@1 as city, population@2 as population]
                          RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                            CsvExec: file_groups={1 group: [[tmp/.tmpBsbmOs/data.csv]]}, projection=[date, city, population], has_header=true
                    CoalesceBatchesExec: target_batch_size=8192
                      RepartitionExec: partitioning=Hash([date@0, city@1], 16), input_partitions=1
                        ParquetExec: file_groups={1 group: [[tmp/.tmpBsbmOs/datasets/foo.bar/data/f1620e5d7c1414a901e3c1e2324870a568f07ca633701de7358c4c50b2785be745060]]}, projection=[date, city]

The error happens deep in our system when merging two dataframes, one read from Parquet (and having Utf8View columns) and another from CSV (having Utf8 columns). The key operation is to de-duplicate records in CSV with those in Parquet using LeftAnti JOIN.

Expected behavior

No error / error during planning if some operation is invalid

Additional context

No response