dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.38k stars 154 forks source link

bug when using custom naming on table with child tables #1754

Closed sh-rp closed 1 month ago

sh-rp commented 1 month ago

What's happening?

When using a custom NamingConvention, loading twice to a table with child tables will yield a SchemaIdentifierNormalizationCollision. I assume certain internal identifiers are not normalized before comparing them to the existing already normalized identifiers in the schema, in this case the collission happens on _dlt_parent_id.

How to reproduce

  1. Create a naming convention that will change the identifiers to something else than the default (snake_case):

from dlt.common.normalizers.naming.naming import NamingConvention as BaseNamingConvention

# file: upper.py
class NamingConvention(BaseNamingConvention):
    PATH_SEPARATOR = "__"

    _CLEANUP_TABLE = str.maketrans(".\n\r'\"▶", "______")

    @property
    def is_case_sensitive(self) -> bool:
        return True

    def normalize_identifier(self, identifier: str) -> str:
        identifier = super().normalize_identifier(identifier)
        norm_identifier = identifier.translate(self._CLEANUP_TABLE).upper()
        return self.shorten_identifier(norm_identifier, identifier, self.max_length)
  1. Create a simple pipeline that loads the same data twice to one table which has a child table:
# file pipeline.py
import dlt
import os

# fixtures
customers = [
    {"id": 1, "name": "dave", "orders": [1,2,3]},
]

if __name__ == "__main__":
    os.environ["SCHEMA__NAMING"] = "upper"

    duck_pipeline = dlt.pipeline(
        pipeline_name="test", destination="duckdb"
    )

    @dlt.resource()
    def c():
        yield from customers

    # run and print result
    print(duck_pipeline.run([c()]))
    print(duck_pipeline.run([c()]))
    print(duck_pipeline.default_schema.to_pretty_yaml())
  1. Run the pipeline, you will see the error. If you do not change the env SCHEMA__NAMING thus keeping the standard naming convention, this pipeline will load

  2. Observe the stack trace of the exception

2024-08-27 22:27:45,709|[ERROR]|29427|8629324800|dlt|reference.py|_verify_schema:507|In schema: test: A column name VALUE in table C__ORDERS collides with value after normalization with upper naming convention. Destination is case insensitive. Please clean up your data before loading so the entities have different name. You can also change to case insensitive naming convention. Note that in that case data from both columns will be merged into one.
2024-08-27 22:27:45,709|[ERROR]|29427|8629324800|dlt|reference.py|_verify_schema:507|In schema: test: A column name _DLT_PARENT_ID in table C__ORDERS collides with _dlt_parent_id after normalization with upper naming convention. Destination is case insensitive. Please clean up your data before loading so the entities have different name. You can also change to case insensitive naming convention. Note that in that case data from both columns will be merged into one.
2024-08-27 22:27:45,709|[ERROR]|29427|8629324800|dlt|reference.py|_verify_schema:507|In schema: test: A column name _DLT_LIST_IDX in table C__ORDERS collides with _dlt_list_idx after normalization with upper naming convention. Destination is case insensitive. Please clean up your data before loading so the entities have different name. You can also change to case insensitive naming convention. Note that in that case data from both columns will be merged into one.
Traceback (most recent call last):
  File "/Volumes/dlthub/dlt/dlt/pipeline/pipeline.py", line 586, in load
    runner.run_pool(load_step.config, load_step)
  File "/Volumes/dlthub/dlt/dlt/common/runners/pool_runner.py", line 89, in run_pool
    while _run_func():
  File "/Volumes/dlthub/dlt/dlt/common/runners/pool_runner.py", line 82, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
  File "/Volumes/dlthub/dlt/dlt/load/load.py", line 611, in run
    self.load_single_package(load_id, schema)
  File "/Volumes/dlthub/dlt/dlt/load/load.py", line 501, in load_single_package
    applied_update = init_client(
  File "/Volumes/dlthub/dlt/dlt/load/utils.py", line 116, in init_client
    applied_update = _init_dataset_and_update_schema(
  File "/Volumes/dlthub/dlt/dlt/load/utils.py", line 179, in _init_dataset_and_update_schema
    applied_update = job_client.update_stored_schema(
  File "/Volumes/dlthub/dlt/dlt/destinations/job_client_impl.py", line 171, in update_stored_schema
    super().update_stored_schema(only_tables, expected_update)
  File "/Volumes/dlthub/dlt/dlt/common/destination/reference.py", line 453, in update_stored_schema
    self._verify_schema()
  File "/Volumes/dlthub/dlt/dlt/destinations/job_client_impl.py", line 651, in _verify_schema
    super()._verify_schema()
  File "/Volumes/dlthub/dlt/dlt/common/destination/reference.py", line 508, in _verify_schema
    raise exceptions[0]
dlt.common.schema.exceptions.SchemaIdentifierNormalizationCollision: In schema: test: A column name VALUE in table C__ORDERS collides with value after normalization with upper naming convention. Destination is case insensitive. Please clean up your data before loading so the entities have different name. You can also change to case insensitive naming convention. Note that in that case data from both columns will be merged into one.
rudolfix commented 1 month ago

confirmed! two hardcoded columns got fixed, value is still hardcoded. I'll ping you with fix PR so you can add test to it. thx!