dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.29k stars 149 forks source link

Nullable fields cause `column mismatch` error #1400

Closed hello-world-bfree closed 3 months ago

hello-world-bfree commented 3 months ago

dlt version

0.4.7

Describe the problem

This tracks the same issue I raised before here with an example to hopefully better highlight the issue.

At least when using DuckDB as the destination, if a source has a schema with nullable fields, the load step of the pipeline will fail due to a column mismatch.

In the following example, the pipeline has Stripe as the source, specifically hitting the customers endpoint, with a local DuckDB database as the destination. Everything works as expected until the load step. Only one of the five files to be loaded makes it into the database. When the next file goes to load, it fails after multiple retries because of a column mismatch in the new file. You'll see here that the customers object has many nullable fields and objects.

The schema of each of the files taken from .dlt/pipelines/stripe_customers_20240522/load/normalized/{pipeline_run} are shown below. The first schema screenshot was taken from file in the completed_jobs directory being the first and only file to load:

Screenshot 2024-05-22 at 1 27 28 PM

The rest of the files failed to load due to their differing schemas due to none of their records having one or many of the optional fields. Below are screenshots of the schemas from each of the files:

Screenshot 2024-05-22 at 1 28 36 PM Screenshot 2024-05-22 at 1 28 22 PM Screenshot 2024-05-22 at 1 28 01 PM Screenshot 2024-05-22 at 1 27 45 PM

The schema file produced - schema.json

The stacktrace from the REPL - column_mismatch.txt

Expected behavior

I'd expect the determined schema to be created in the destination to be a superset of all the source records and all records to load into the source table. If a particular record didn't have one or many of the optional fields, those columns would be set to null in the destination table.

Steps to reproduce

The data has to be large enough so that multiple load files are created. Everything works as expected if a single file can handle the complete load.

Below uses the Stripe customers endpoint, but it seems to affect any large data set that has optional fields.

import dlt
import duckdb

from dlt_pipelines.stripe_analytics import stripe_source

conn = duckdb.connect("~/stripe_customers.duckdb")

p = dlt.pipeline("stripe_customers_20240522", dataset_name="stripe", destination=dlt.destinations.duckdb(conn), progress = "log")

src = stripe_source(endpoints=["Customer"], stripe_secret_key=STRIPE_API_KEY)

info = p.run(src, loader_file_format = "parquet")

Operating system

macOS

Runtime environment

Local

Python version

3.11

dlt data source

Stripe customers endpoint via the Stripe verified source.

dlt destination

DuckDB

Other deployment details

No response

Additional information

No response

rudolfix commented 3 months ago

@hello-world-bfree thanks for the bug report. I think it is (more or less) clear what is going on. if your data source is adding columns on the fly and the in-memory buffer for extracted data does not hold many rows (5000 by default) then we'll indeed write several parquet files with different schemas (you could increase the buffer size to a 1000 000 rows and see if that still happens: https://dlthub.com/docs/reference/performance#controlling-in-memory-buffers)

@steinitzu let's try to fix it. since all parquet files are loaded from a local storage we are able to get the column names and generate a COPY command per file

with maybe_context(lock):
            with sql_client.begin_transaction():
                sql_client.execute_sql(
                    f"COPY {qualified_table_name} FROM '{file_path}' ( FORMAT"
                    f" {source_format} {options});"
                )

right now we assume that duckdb will handle this itself and it is apparently not the case.

the above will work because dlt makes sure that all changes are added and schema is already migrated so all possible columns are present

steinitzu commented 3 months ago

@hello-world-bfree I'm pretty sure this was fixed in dlt 0.4.8 can you try updating? Latest version is 0.4.12
I was only able to replicate the bug on 0.4.7
There is normalization done for parquet files now where missing columns are added and columns are re-ordered as needed.