duckdb / duckdb_delta

DuckDB extension for Delta Lake
MIT License
121 stars 14 forks source link

DeltaKernel FFI error on delta_scan with columnMapping = "name" #11

Closed kmatt closed 4 months ago

kmatt commented 4 months ago

delta_scan fails on tables created with spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode", "name"):

$ ./duckdb
v0.10.3 70fd6a8a24
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.

D select * from delta_scan('file:///tank/delta/TEST/dbo/Candidate/') limit 1;

IO Error: Hit DeltaKernel FFI error (from: kernel_scan_data_next in DeltaSnapshot GetFile): Hit error: 2 (ArrowError) with message (Invalid argument error: Incorrect datatype for StructArray field "partitionValues", expected Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) got Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false))

Changing path to lowercase as in #7 results in a different error:

D select * from delta_scan('file:///tank/delta/test/dbo/candidate/') limit 1;
IO Error: Hit DeltaKernel FFI error (from: snapshot in DeltaScanScanBind): Hit error: 15 (MissingVersionError) with message (No table version found.)

Delta tables written with:

io.delta#delta-core_2.12 added as a dependency                                                                                             
:: resolving dependencies :: org.apache.spark#spark-submit-parent-29dd794c-e4f3-47d8-82a7-fa5f1114dfe8;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.4.0 in central
        found io.delta#delta-storage;2.4.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
$ uname -a
Linux vm-etl-01 6.2.0-1014-azure #14~22.04.1-Ubuntu SMP Wed Sep 13 16:15:26 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
kmatt commented 4 months ago

spark-sql (default)> SELECT * FROM delta./tank/delta/testlimit 1; succeeds

D select * from delta_scan('file:///tank/delta/test/') limit 1;

IO Error: Hit DeltaKernel FFI error (from: kernel_scan_data_next in DeltaSnapshot GetFile): Hit error: 2 (ArrowError) with message (Invalid argument error: Incorrect datatype for StructArray field "partitionValues", expected Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) got Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false))
mrjsj commented 4 months ago

Try to only change the table-name of your path to lower-case, and not the entire path.

kmatt commented 4 months ago

Try to only change the table-name of your path to lower-case, and not the entire path.

Similar errors:

IO Error: Hit DeltaKernel FFI error (from: snapshot in DeltaScanScanBind): Hit error: 15 (MissingVersionError) with message (No table version found.)

mrjsj commented 4 months ago

It seems your first path works fine (it finds the table), but it has issues dealing with the complex data types of your delta table. Can you try only selecting a single column (non-complex data type)

kmatt commented 4 months ago

It seems your first path works fine (it finds the table), but it has issues dealing with the complex data types of your delta table. Can you try only selecting a single column (non-complex data type)

That brings be to a new error:

D select bc_RECORD_NBR from delta_scan('file:///tank/delta/test/dbo/branch_codes/') limit 1;

IO Error: Failed to read file "/tank/delta/test/dbo/branch_codes//part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet": schema mismatch in glob: column "bc_RECORD_NBR" was read from the original file "/tank/delta/test/TRMSQL/dbo/branch_codes//part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet", but could not be found in file "/tank/delta/test/TRMSQL/dbo/branch_codes//part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet".
Candidate names: col-a553c046-6a4e-4e34-8717-43c3156f66d2, col-cdcdd91e-4c7a-4636-94ca-12fd6a53be8e, col-830c96c7-57c9-453a-9720-b6443512758b, col-1794286b-4cf9-4d53-97fd-bf51f77e4f56, col-69c5d36c-f892-4d82-8b94-a94f9e160654, col-2eee13ba-777c-4062-988e-5789861285f2, col-362848d8-cad7-4b56-bdad-3f1a333c694a, col-925ac107-7ef4-4f62-8433-3e87a24de19c, col-20e7666d-0fce-45ea-97fe-c388c39a625e, col-ce9e4a98-e7b7-49da-94f4-d093b48a8000, col-cf52f10d-0748-4452-a0c0-75862157342e, col-c2103c5d-fcc5-4799-b908-4f0f69cacaf3, col-1b8177f8-e2f8-4348-ba3f-c7b824a3d019, col-41c065de-0f19-4942-8edd-4aa3b867b027, col-f8c42b5a-a638-4c4c-8160-45e25a2d6548, col-7da4091e-cefb-45a0-be74-2c92572e235d, col-0685b2fc-0be1-4967-9d85-c01e40617594, col-0b1ba16f-7932-4f2d-9797-63c9d391abe9, col-5bea3349-ffc3-4494-8750-7d1dcc4f9fd6, col-c107496d-1277-4592-8be8-fb76ede88ff0, col-ccfa624a-c7d1-438f-b8b0-b4bc15b8d2da, col-678f7e9c-4658-49e2-9c0e-bc88b9900eb8, col-bc1e83e9-0b11-44ce-b20c-4dac1095a9a8, col-4f4146ea-f60e-4e45-a8b7-917888d2fbd0, col-f8dff630-1a04-45bb-b354-1c1043f36fef, file_row_number
If you are trying to read files with different schemas, try setting union_by_name=True

Schema as seen by Spark:

spark-sql (default)> describe table delta.`/tank/delta/test/dbo/branch_codes`;
bc_RECORD_NBR           int
bc_Branch_Code          string
bc_Region_Code          string
bc_Office_Name          string
bc_address              string
bc_city                 string
bc_state                string
bc_zip                  string
bc_phone                string
bc_fax                  string
bc_toll_free            string
bc_branch_type          string
bc_webname              string
bc_TimeZone             string
bc_Acting_RVP           string
bc_Acting_DM            string
bc_payroll_fax          string
bc_email                string
branch_id               int
CostCenter              string
SearchArea              string
BranchAccountingTypeId  int
CC_Rep                  string
OpenDate                timestamp
CloseDate               timestamp
Time taken: 0.058 seconds, Fetched 25 row(s)
D select bc_address from delta_scan('file:///tank/delta/test/dbo/branch_codes', union_by_name=True) limit 1;

IO Error: Failed to read file "/tank/delta/test/dbo/branch_codes/part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet": schema mismatch in glob: column "bc_address" was read from the original file "/tank/delta/test/dbo/branch_codes/part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet", but could not be found in file "/tank/delta/test/dbo/branch_codes/part-00000-dac9ffff-2d18-41dd-90e2-c00163bba729-c000.snappy.parquet".
Candidate names: col-a553c046-6a4e-4e34-8717-43c3156f66d2, col-cdcdd91e-4c7a-4636-94ca-12fd6a53be8e, col-830c96c7-57c9-453a-9720-b6443512758b, col-1794286b-4cf9-4d53-97fd-bf51f77e4f56, col-69c5d36c-f892-4d82-8b94-a94f9e160654, col-2eee13ba-777c-4062-988e-5789861285f2, col-362848d8-cad7-4b56-bdad-3f1a333c694a, col-925ac107-7ef4-4f62-8433-3e87a24de19c, col-20e7666d-0fce-45ea-97fe-c388c39a625e, col-ce9e4a98-e7b7-49da-94f4-d093b48a8000, col-cf52f10d-0748-4452-a0c0-75862157342e, col-c2103c5d-fcc5-4799-b908-4f0f69cacaf3, col-1b8177f8-e2f8-4348-ba3f-c7b824a3d019, col-41c065de-0f19-4942-8edd-4aa3b867b027, col-f8c42b5a-a638-4c4c-8160-45e25a2d6548, col-7da4091e-cefb-45a0-be74-2c92572e235d, col-0685b2fc-0be1-4967-9d85-c01e40617594, col-0b1ba16f-7932-4f2d-9797-63c9d391abe9, col-5bea3349-ffc3-4494-8750-7d1dcc4f9fd6, col-c107496d-1277-4592-8be8-fb76ede88ff0, col-ccfa624a-c7d1-438f-b8b0-b4bc15b8d2da, col-678f7e9c-4658-49e2-9c0e-bc88b9900eb8, col-bc1e83e9-0b11-44ce-b20c-4dac1095a9a8, col-4f4146ea-f60e-4e45-a8b7-917888d2fbd0, col-f8dff630-1a04-45bb-b354-1c1043f36fef, file_row_number
If you are trying to read files with different schemas, try setting union_by_name=True
mrjsj commented 4 months ago

So I assume read_parquet('.../**/*.parquet', union_by_name=true) works, ok?

kmatt commented 4 months ago

So I assume read_parquet('.../**/*.parquet', union_by_name=true) works, ok?

Yes, but it does not resolve column names correctly, but this may be due to reading Delta tables as Parquet ?

The column names below are mapped correctly in _delta_log/00000000000000000000.json:

D select * from read_parquet('/tank/delta/test/dbo/branch_codes/*.parquet', union_by_name=True) limit 1;
┌──────────────────────┬──────────────────────┬───┬──────────────────────┬──────────────────────┬──────────────────────┐
│ col-14946dbd-f0a4-…  │ col-59fc3a4f-b087-…  │ … │ col-3701717e-3efe-…  │ col-f3a0a96b-ac14-…  │ col-7f127cb4-ac58-…  │
│        int32         │       varchar        │   │       varchar        │      timestamp       │      timestamp       │
├──────────────────────┼──────────────────────┼───┼──────────────────────┼──────────────────────┼──────────────────────┤
│                  151 │ ECOE                 │ … │                      │ 2019-07-02 00:00:00  │                      │
├──────────────────────┴──────────────────────┴───┴──────────────────────┴──────────────────────┴──────────────────────┤
│ 1 rows                                                                                          25 columns (5 shown) │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
mrjsj commented 4 months ago

Yes, so from this I suppose the delta_scan extension (and specifically the delta-kernel-rs dependency) does not yet support the column-mapping: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping

kmatt commented 4 months ago

Yes, so from this I suppose the delta_scan extension (and specifically the delta-kernel-rs dependency) does not yet support the column-mapping: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping

That is the issue. If I change spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode", "name")

to spark.conf.set("spark.databricks.delta.properties.defaults.columnMapping.mode", "none") the issue is resolved.

I'm using "name" mode for this reason, so I will have to see if its a requirement for my project:

Delta can use column mapping to avoid any column naming restrictions, and to support the renaming and dropping of columns without having to rewrite all the data."

samansmink commented 4 months ago

Column mapping is indeed not yet supported, it's on the wish list :)

kmatt commented 4 months ago

spark.databricks.delta.properties.defaults.columnMapping.mode not supported as of DuckDB v0.10.3