dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.38k stars 154 forks source link

Proper pipeline `refresh` with `delta` table format #1777

Open jorritsandbrink opened 3 weeks ago

jorritsandbrink commented 3 weeks ago

Feature description

Two flaws exist with pipeline refresh for delta table format on filesystem destination:

  1. Incomplete "DROP" ➜ empty table folder remains with drop_sources.
  2. "TRUNCATE" behaves like "DROP" ➜ table schema/history gets deleted with drop_data.

Repro (1):

import dlt
from dlt.destinations import filesystem
from tests.pipeline.utils import airtable_emojis

source = airtable_emojis().with_resources("📆 Schedule", "🦚Peacock")
for resource in source.selected_resources.values():
    resource.apply_hints(table_format="delta")

pipe = dlt.pipeline(
    pipeline_name="refresh_repro",
    pipelines_dir="_storage",
    destination=filesystem("_storage")
)

pipe.run(source)
pipe.run(source.with_resources("🦚Peacock"), refresh="drop_sources")
# actual: empty folder `/_schedule/_delta_log` remains
# expected: `/_schedule/_delta_log` no longer exists

Repro (2):

import dlt
from dlt.destinations import filesystem
from tests.pipeline.utils import airtable_emojis

source = airtable_emojis().with_resources("📆 Schedule", "🦚Peacock")
for resource in source.selected_resources.values():
    resource.apply_hints(table_format="delta")

pipe = dlt.pipeline(
    pipeline_name="refresh_repro",
    pipelines_dir="_storage",
    destination=filesystem("_storage")
)

pipe.run(source)
pipe.run(source.with_resources("📆 Schedule"), refresh="drop_data")
# actual: _schedule table has single commit (/_schedule/_delta_log/00000000000000000000.json) (in SQL terms: table got DROPped)
# expected: _schedule table has two commits (in SQL terms: table got TRUNCATEd)

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

No response

Proposed solution

Custom implementations for drop_tables and truncate_tables for delta. Currently generic filesystem implementations are applied.

Related issues

https://github.com/dlt-hub/dlt/pull/1742#issuecomment-2310481736