dlt-hub / verified-sources

Contribute to dlt verified sources 🔥
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
52 stars 40 forks source link

import_schema_path yaml.constructor.ConstructorError #517

Open MarkrJames opened 1 week ago

MarkrJames commented 1 week ago

dlt version

0.4.12

Describe the problem

import_schema_path seems to be generating an incorrectly formatted schema/import/.yaml

I'm getting the following error:

Traceback (most recent call last):
  File "/workspaces/dbt-duckdb-deltalake/dlt_newtest/filesystem_pipeline copy 5.py", line 23, in <module>
    load_info = pipeline.run(sql_table_source, table_name='Hazards_dlt', write_disposition="append")
  File "/usr/local/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 222, in _wrap
    step_info = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 267, in _wrap
    return f(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 683, in run
    self.normalize(loader_file_format=loader_file_format)
  File "/usr/local/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 222, in _wrap
    step_info = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dlt/pipeline/pipeline.py", line 182, in _wrap
    schema = self._schema_storage.load_schema(name)
  File "/usr/local/lib/python3.9/site-packages/dlt/common/storages/schema_storage.py", line 54, in load_schema
    return self._maybe_import_schema(name, storage_schema)
  File "/usr/local/lib/python3.9/site-packages/dlt/common/storages/schema_storage.py", line 107, in _maybe_import_schema
    imported_schema = self._load_import_schema(name)
  File "/usr/local/lib/python3.9/site-packages/dlt/common/storages/schema_storage.py", line 155, in _load_import_schema
    return self._parse_schema_str(
  File "/usr/local/lib/python3.9/site-packages/dlt/common/storages/schema_storage.py", line 207, in _parse_schema_str
    imported_schema = yaml.safe_load(schema_str)
  File "/usr/local/lib/python3.9/site-packages/yaml/__init__.py", line 125, in safe_load
    return load(stream, SafeLoader)
  File "/usr/local/lib/python3.9/site-packages/yaml/__init__.py", line 81, in load
    return loader.get_single_data()
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 51, in get_single_data
    return self.construct_document(node)
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 60, in construct_document
    for dummy in generator:
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 413, in construct_yaml_map
    value = self.construct_mapping(node)
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 218, in construct_mapping
    return super().construct_mapping(node, deep=deep)
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 143, in construct_mapping
    value = self.construct_object(value_node, deep=deep)
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 100, in construct_object
    data = constructor(self, node)
  File "/usr/local/lib/python3.9/site-packages/yaml/constructor.py", line 427, in construct_undefined
    raise ConstructorError(None, None,
yaml.constructor.ConstructorError: could not determine a constructor for the tag 'tag:yaml.org,2002:python/object/apply:sqlalchemy.sql.elements.quoted_name'
  in "<unicode string>", line 81, column 15:
        resource: !!python/object/apply:sqlalchemy ..

Part of the yaml generated is below:

     updated_by:
        data_type: text
        nullable: true
        precision: 50
      sys_start_time:
        data_type: timestamp
        nullable: true
      sys_end_time:
        data_type: timestamp
        nullable: true
    write_disposition: append
    **resource: !!python/object/apply:sqlalchemy.sql.elements.quoted_name**
    - Hazards
    - null
  _dlt_pipeline_state:
    columns:
      version:
        data_type: bigint
        nullable: false

Script:

 import dlt   
 import sql_database

pipeline = dlt.pipeline(
    pipeline_name="sql_pipeline",
    destination='mssql',
    dataset_name="sink", #schema at destination
    progress="log",
    import_schema_path="schemas/import",
    export_schema_path="schemas/export", 
    full_refresh=True   
    )

sql_table_source = sql_database.sql_table(
    credentials = <my_connection_string>,
    table= "Hazards",
    schema = "sink",
    detect_precision_hints=True
)

load_info = pipeline.run(sql_table_source, table_name='Hazards_dlt', write_disposition="append")
print(load_info)

Expected behavior

No response

Steps to reproduce

error:

    import_schema_path="schemas/import",
    export_schema_path="schemas/export",

error:

    import_schema_path="schemas/import",
    #export_schema_path="schemas/export",

Works fine:

    #import_schema_path="schemas/import",
    export_schema_path="schemas/export"

Possibly related to: https://github.com/dlt-hub/dlt/issues/575

Operating system

Linux

Runtime environment

Docker, Docker Compose

Python version

3.9

dlt data source

dlt init sql_database mssql

dlt destination

No response

Other deployment details

No response

Additional information

No response

rudolfix commented 4 days ago

@MarkrJames we'll investigate this. looks like an sqlalchemy identifier is leaking into the schema. (possibly an explicit cast to str is missing)

I'll move this to verified sources