dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.38k stars 153 forks source link

Unable to do an initial load into "bigquery" with any other write_disposition then "append" #1876

Open neuromantik33 opened 1 day ago

neuromantik33 commented 1 day ago

dlt version

1.0.0

Describe the problem

For some reason that escapes me, at one point I'm no longer able to load any data into BigQuery using a write_disposition other than 'append'. The reason for this is that replace and merge both truncate some tables and for reasons unknown to me, dlt is longer able to detect that the tables don't exist and the pipeline fails.

Expected behavior

The expected behavior is to be able to do an initial load in a non existing dataset using a write_disposition other than append.

Steps to reproduce

Here is my test script:

import dlt
from dlt.common.libs.sql_alchemy import sqltypes
from dlt.destinations.adapters import bigquery_adapter
from dlt.sources import DltSource, incremental
from dlt.sources.sql_database import sql_table

def sql_type_adapter(sql_t):
    if isinstance(sql_t, sqltypes.ARRAY):
        return sqltypes.JSON
    return sql_t

tables_configs: dict[str, dict] = {
    "applicant": {"cursor_path": "updated_at"},
    "application": {},
}

@dlt.source
def apply_db(conn_uri: str) -> DltSource:  # type: ignore[misc]
    for table_name, table_config in tables_configs.items():
        cursor_path = table_config.get("cursor_path")
        t_resource = sql_table(
            credentials=conn_uri,
            table=table_name,
            incremental=incremental(cursor_path=cursor_path) if cursor_path else None,
            detect_precision_hints=True,
            type_adapter_callback=sql_type_adapter,
        )
        bigquery_adapter(t_resource, partition=cursor_path, autodetect_schema=True)  # type: ignore[arg-type]
        assert isinstance(t_resource.columns, dict)
        file_format = (
            "jsonl" if any(col.get("data_type") == "json" for col in t_resource.columns.values()) else "parquet"
        )
        t_resource.apply_hints(file_format=file_format)  # type: ignore[arg-type]
        yield t_resource

if __name__ == "__main__":
    from dlt import Pipeline

    from nxt.config import __config__ as cfg

    pipeline: Pipeline = dlt.pipeline(
        pipeline_name="apply_db",
        destination="bigquery",
        staging="filesystem",
        dataset_name=cfg.apply_db.dataset,
        dev_mode=True,
    )

    source = apply_db(f"{cfg.apply_db.dsn}")
    load_info = pipeline.run(source, write_disposition="merge")

    load_info.raise_on_failed_jobs()
    print(load_info)

    last_trace = pipeline.last_trace
    print(last_trace)

and here is the expected output (I've also adding some additional logging to see the queries that are failing)

+ set -a
+ source .env
++ APPLY_DB__DATASET=drnick_apply
++ APPLY_DB__DSN=postgresql+psycopg2://<truncated>@<truncated>/apply
++ DESTINATION__BIGQUERY__LOCATION=europe-west1
++ DESTINATION__BIGQUERY__TRUNCATE_TABLES_ON_STAGING_DESTINATION_BEFORE_LOAD=false
++ DESTINATION__FILESYSTEM__BUCKET_URL=gs://<truncated>/dlt
+ set +a
++ echo nxt_dlt/tmp_apply_db.py
++ sed 's/\//./g'
++ sed 's/\.py$//'
+ module_name=nxt_dlt.tmp_apply_db
+ poetry run python -m nxt_dlt.tmp_apply_db
2024-09-25 17:43:54,415|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,235|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,235|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name applicant_id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,274|[INFO]|121360|126181426977664|dlt|__init__.py|bind:439|Bind incremental on applicant with initial_value: None, start_value: None, end_value: None
2024-09-25 17:43:55,281|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,396|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,396|[WARNING]|121360|126181426977664|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name applicant_id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 17:43:55,514|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db.applicant with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/0666a57b5c0993e7/1727279035.267658/new_jobs/applicant.599a4c07ba.0.typed-jsonl
2024-09-25 17:43:55,518|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db.application with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/0666a57b5c0993e7/1727279035.267658/new_jobs/application.9e7a53daa0.0.typed-jsonl
2024-09-25 17:43:55,572|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db._dlt_pipeline_state with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/0666a57b5c0993e7/1727279035.267658/new_jobs/_dlt_pipeline_state.6e8a7dd556.0.typed-jsonl
2024-09-25 17:43:55,629|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:75|Created none pool with 1 workers
2024-09-25 17:43:55,629|[DEBUG]|121360|126181426977664|dlt|pool_runner.py|run_pool:88|Running pool
2024-09-25 17:43:55,629|[INFO]|121360|126181426977664|dlt|normalize.py|run:263|Running file normalizing
2024-09-25 17:43:55,629|[INFO]|121360|126181426977664|dlt|normalize.py|run:266|Found 1 load packages
2024-09-25 17:43:55,643|[INFO]|121360|126181426977664|dlt|normalize.py|run:289|Found 3 files in schema apply_db load_id 1727279035.267658
2024-09-25 17:43:55,643|[INFO]|121360|126181426977664|dlt|normalize.py|spool_schema_files:240|Created new load package 1727279035.267658 on loading volume
2024-09-25 17:43:55,649|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:123|A file format for table _dlt_pipeline_state was specified to preferred in the resource so parquet format being used.
2024-09-25 17:43:55,649|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 17:43:55,649|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727279035.267658/new_jobs/_dlt_pipeline_state.6e8a7dd556.0.typed-jsonl in load_id 1727279035.267658 with table name _dlt_pipeline_state and schema apply_db
2024-09-25 17:43:55,651|[DEBUG]|121360|126181426977664|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727279035.267658/new_jobs/_dlt_pipeline_state.6e8a7dd556.0.typed-jsonl
2024-09-25 17:43:55,651|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:236|Processed file 1727279035.267658/new_jobs/_dlt_pipeline_state.6e8a7dd556.0.typed-jsonl
2024-09-25 17:43:55,651|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:123|A file format for table applicant was specified to parquet in the resource so parquet format being used.
2024-09-25 17:43:55,651|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 17:43:55,651|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727279035.267658/new_jobs/applicant.599a4c07ba.0.typed-jsonl in load_id 1727279035.267658 with table name applicant and schema apply_db
2024-09-25 17:43:55,655|[DEBUG]|121360|126181426977664|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727279035.267658/new_jobs/applicant.599a4c07ba.0.typed-jsonl
2024-09-25 17:43:55,655|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:236|Processed file 1727279035.267658/new_jobs/applicant.599a4c07ba.0.typed-jsonl
2024-09-25 17:43:55,656|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:123|A file format for table application was specified to parquet in the resource so parquet format being used.
2024-09-25 17:43:55,656|[INFO]|121360|126181426977664|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 17:43:55,656|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727279035.267658/new_jobs/application.9e7a53daa0.0.typed-jsonl in load_id 1727279035.267658 with table name application and schema apply_db
2024-09-25 17:43:55,658|[DEBUG]|121360|126181426977664|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727279035.267658/new_jobs/application.9e7a53daa0.0.typed-jsonl
2024-09-25 17:43:55,659|[DEBUG]|121360|126181426977664|dlt|worker.py|w_normalize_files:236|Processed file 1727279035.267658/new_jobs/application.9e7a53daa0.0.typed-jsonl
2024-09-25 17:43:55,659|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db._dlt_pipeline_state with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727279035.267658/new_jobs/_dlt_pipeline_state.51a1f0703d.0.parquet
2024-09-25 17:43:55,661|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db.applicant with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727279035.267658/new_jobs/applicant.6d7c3aafc7.0.parquet
2024-09-25 17:43:55,664|[DEBUG]|121360|126181426977664|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727279035.267658.apply_db.application with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727279035.267658/new_jobs/application.de61105182.0.parquet
2024-09-25 17:43:55,667|[INFO]|121360|126181426977664|dlt|worker.py|w_normalize_files:244|Processed all items in 3 files
2024-09-25 17:43:55,967|[INFO]|121360|126181426977664|dlt|normalize.py|update_schema:86|Updating schema for table _dlt_pipeline_state with 1 deltas
2024-09-25 17:43:55,968|[INFO]|121360|126181426977664|dlt|normalize.py|update_schema:86|Updating schema for table applicant with 1 deltas
2024-09-25 17:43:55,968|[INFO]|121360|126181426977664|dlt|normalize.py|update_schema:86|Updating schema for table application with 1 deltas
2024-09-25 17:43:55,968|[DEBUG]|121360|126181426977664|dlt|normalize.py|map_parallel:152|0 tasks still remaining for 1727279035.267658...
2024-09-25 17:43:55,968|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:194|Table _dlt_pipeline_state has seen data for a first time with load id 1727279035.267658
2024-09-25 17:43:55,968|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:194|Table applicant has seen data for a first time with load id 1727279035.267658
2024-09-25 17:43:55,968|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:194|Table application has seen data for a first time with load id 1727279035.267658
2024-09-25 17:43:55,969|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:200|Saving schema apply_db with version 1:2
2024-09-25 17:43:55,970|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:216|Committing storage, do not kill this process
2024-09-25 17:43:55,971|[INFO]|121360|126181426977664|dlt|normalize.py|spool_files:222|Extracted package 1727279035.267658 processed
2024-09-25 17:43:55,971|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:101|Closing processing pool
2024-09-25 17:43:55,971|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:104|Processing pool closed
2024-09-25 17:43:57,355|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:75|Created thread pool with 20 workers
2024-09-25 17:43:57,355|[DEBUG]|121360|126181426977664|dlt|pool_runner.py|run_pool:88|Running pool
2024-09-25 17:43:57,355|[INFO]|121360|126181426977664|dlt|load.py|run:612|Running file loading
2024-09-25 17:43:57,355|[INFO]|121360|126181426977664|dlt|load.py|run:615|Found 1 load packages
2024-09-25 17:43:57,355|[INFO]|121360|126181426977664|dlt|load.py|run:621|Loading schema from load package in 1727279035.267658
2024-09-25 17:43:57,363|[INFO]|121360|126181426977664|dlt|load.py|run:623|Loaded schema name apply_db and version 2
2024-09-25 17:43:57,365|[WARNING]|121360|126181426977664|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'birth_date'. in table 'applicant'.
2024-09-25 17:43:57,365|[WARNING]|121360|126181426977664|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'created_at'. in table 'applicant'.
2024-09-25 17:43:57,365|[WARNING]|121360|126181426977664|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'updated_at'. in table 'applicant'.
2024-09-25 17:43:57,365|[INFO]|121360|126181426977664|dlt|utils.py|_init_dataset_and_update_schema:157|Client for bigquery will start initialize storage 
2024-09-25 17:43:59,480|[INFO]|121360|126181426977664|dlt|utils.py|_init_dataset_and_update_schema:175|Client for bigquery will update schema to package schema 
2024-09-25 17:43:59,896|[INFO]|121360|126181426977664|dlt|job_client_impl.py|update_stored_schema:177|Schema with hash fUkdRFw+jB9hkEkr1v6vY65J184GXGOBgiG4I6dMrG0= not found in the storage. upgrading
2024-09-25 17:43:59,896|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: 
SELECT table_name,column_name,data_type,is_nullable
    FROM `<truncated>`.`drnick_apply_20240925034353`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name IN (%s,%s,%s,%s,%s) ORDER BY table_name, ordinal_position;
2024-09-25 17:44:00,505|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 6 updates for _dlt_version in apply_db
2024-09-25 17:44:00,506|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 7 updates for application in apply_db
2024-09-25 17:44:00,506|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 8 updates for _dlt_pipeline_state in apply_db
2024-09-25 17:44:00,507|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 5 updates for _dlt_loads in apply_db
2024-09-25 17:44:00,508|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 15 updates for applicant in apply_db
2024-09-25 17:44:00,509|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925034353`.`_dlt_version` (
`version` INT64 NOT NULL,
`engine_version` INT64 NOT NULL,
`inserted_at` TIMESTAMP NOT NULL,
`schema_name` STRING NOT NULL,
`version_hash` STRING NOT NULL,
`schema` STRING NOT NULL);
CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925034353`.`_dlt_pipeline_state` (
`version` INT64 NOT NULL,
`engine_version` INT64 NOT NULL,
`pipeline_name` STRING NOT NULL,
`state` STRING NOT NULL,
`created_at` TIMESTAMP NOT NULL,
`version_hash` STRING ,
`_dlt_load_id` STRING NOT NULL,
`_dlt_id` STRING NOT NULL);
CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925034353`.`_dlt_loads` (
`load_id` STRING NOT NULL,
`schema_name` STRING ,
`status` INT64 NOT NULL,
`inserted_at` TIMESTAMP NOT NULL,
`schema_version_hash` STRING );
2024-09-25 17:44:01,582|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: INSERT INTO `<truncated>`.`drnick_apply_20240925034353`.`_dlt_version`(`version`, `engine_version`, `inserted_at`, `schema_name`, `version_hash`, `schema`) VALUES (%s, %s, %s, %s, %s, %s);
2024-09-25 17:44:03,883|[INFO]|121360|126181426977664|dlt|utils.py|_init_dataset_and_update_schema:157|Client for bigquery will start initialize storage for staging dataset
2024-09-25 17:44:05,216|[INFO]|121360|126181426977664|dlt|utils.py|_init_dataset_and_update_schema:175|Client for bigquery will update schema to package schema for staging dataset
2024-09-25 17:44:05,524|[INFO]|121360|126181426977664|dlt|job_client_impl.py|update_stored_schema:177|Schema with hash fUkdRFw+jB9hkEkr1v6vY65J184GXGOBgiG4I6dMrG0= not found in the storage. upgrading
2024-09-25 17:44:05,524|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: 
SELECT table_name,column_name,data_type,is_nullable
    FROM `<truncated>`.`drnick_apply_20240925034353_staging`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name IN (%s,%s,%s) ORDER BY table_name, ordinal_position;
2024-09-25 17:44:06,132|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 6 updates for _dlt_version in apply_db
2024-09-25 17:44:06,133|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 7 updates for application in apply_db
2024-09-25 17:44:06,133|[INFO]|121360|126181426977664|dlt|job_client_impl.py|_create_table_update:606|Found 15 updates for applicant in apply_db
2024-09-25 17:44:06,134|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925034353_staging`.`_dlt_version` (
`version` INT64 NOT NULL,
`engine_version` INT64 NOT NULL,
`inserted_at` TIMESTAMP NOT NULL,
`schema_name` STRING NOT NULL,
`version_hash` STRING NOT NULL,
`schema` STRING NOT NULL);
2024-09-25 17:44:06,651|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: INSERT INTO `<truncated>`.`drnick_apply_20240925034353_staging`.`_dlt_version`(`version`, `engine_version`, `inserted_at`, `schema_name`, `version_hash`, `schema`) VALUES (%s, %s, %s, %s, %s, %s);
2024-09-25 17:44:08,401|[INFO]|121360|126181426977664|dlt|utils.py|_init_dataset_and_update_schema:183|Client for bigquery will truncate tables for staging dataset
2024-09-25 17:44:08,690|[WARNING]|121360|126181426977664|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: TRUNCATE TABLE `<truncated>`.`drnick_apply_20240925034353_staging`.`application`;
TRUNCATE TABLE `<truncated>`.`drnick_apply_20240925034353_staging`.`applicant`;
2024-09-25 17:44:09,001|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:101|Closing processing pool
2024-09-25 17:44:09,002|[INFO]|121360|126181426977664|dlt|pool_runner.py|run_pool:104|Processing pool closed
Traceback (most recent call last):
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 220, in _execute
    rows = client.query_and_wait(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 3601, in query_and_wait
    return _job_helpers.query_and_wait(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/_job_helpers.py", line 509, in query_and_wait
    return job_retry(do_query)()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/_job_helpers.py", line 450, in do_query
    response = retry(client._call_api)(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 833, in _call_api
    return call()
           ^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<truncated>/queries?prettyPrint=false: Not found: Table <truncated>:drnick_apply_20240925034353_staging.application was not found in location europe-west1 at [1:1]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/sql_client.py", line 338, in _wrap_gen
    return (yield from f(self, *args, **kwargs))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/impl/bigquery/sql_client.py", line 238, in execute_query
    curr.execute(query, db_args, job_config=self._session_query or self._default_query)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/_helpers.py", line 496, in with_closed_check
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 189, in execute
    self._execute(
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/google/cloud/bigquery/dbapi/cursor.py", line 226, in _execute
    raise exceptions.DatabaseError(exc)
google.cloud.bigquery.dbapi.exceptions.DatabaseError: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<truncated>/queries?prettyPrint=false: Not found: Table <truncated>:drnick_apply_20240925034353_staging.application was not found in location europe-west1 at [1:1]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 572, in load
    runner.run_pool(load_step.config, load_step)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 89, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 82, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/load/load.py", line 637, in run
    self.load_single_package(load_id, schema)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/load/load.py", line 526, in load_single_package
    applied_update = init_client(
                     ^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/load/utils.py", line 136, in init_client
    _init_dataset_and_update_schema(
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/load/utils.py", line 187, in _init_dataset_and_update_schema
    job_client.initialize_storage(truncate_tables=truncate_tables)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/job_client_impl.py", line 163, in initialize_storage
    self.sql_client.truncate_tables(*truncate_tables)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/sql_client.py", line 125, in truncate_tables
    self.execute_many(statements)
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/sql_client.py", line 182, in execute_many
    ret.append(self.execute_sql(sql_fragment, *args, **kwargs))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/impl/bigquery/sql_client.py", line 219, in execute_sql
    with self.execute_query(sql, *args, **kwargs) as curr:
  File "/home/drnick/repos/github/pyenv/pyenv/versions/3.11.9/lib/python3.11/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/destinations/sql_client.py", line 340, in _wrap_gen
    raise self._make_database_exception(ex)
dlt.destinations.exceptions.DatabaseUndefinedRelation: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<truncated>/queries?prettyPrint=false: Not found: Table <truncated>:drnick_apply_20240925034353_staging.application was not found in location europe-west1 at [1:1]

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/drnick/repos/analytics/42next-modeling/nxt_dlt/tmp_apply_db.py", line 54, in <module>
    load_info = pipeline.run(source, write_disposition="merge")
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 223, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 268, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 711, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 223, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 163, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 268, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/drnick/repos/analytics/42next-modeling/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 579, in load
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1727279035.267658 with exception:

<class 'dlt.destinations.exceptions.DatabaseUndefinedRelation'>
400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<truncated>/queries?prettyPrint=false: Not found: Table <truncated>:drnick_apply_20240925034353_staging.application was not found in location europe-west1 at [1:1]

Operating system

Linux

Runtime environment

Local

Python version

3.11

dlt data source

sql_database

dlt destination

Google BigQuery

Other deployment details

If an change the write_disposition to append, everything works fine.

import dlt
from dlt.common.libs.sql_alchemy import sqltypes
from dlt.destinations.adapters import bigquery_adapter
from dlt.sources import DltSource, incremental
from dlt.sources.sql_database import sql_table

def sql_type_adapter(sql_t):
    if isinstance(sql_t, sqltypes.ARRAY):
        return sqltypes.JSON
    return sql_t

tables_configs: dict[str, dict] = {
    "applicant": {"cursor_path": "updated_at"},
    "application": {},
}

@dlt.source
def apply_db(conn_uri: str) -> DltSource:  # type: ignore[misc]
    for table_name, table_config in tables_configs.items():
        cursor_path = table_config.get("cursor_path")
        t_resource = sql_table(
            credentials=conn_uri,
            table=table_name,
            incremental=incremental(cursor_path=cursor_path) if cursor_path else None,
            detect_precision_hints=True,
            type_adapter_callback=sql_type_adapter,
        )
        bigquery_adapter(t_resource, partition=cursor_path, autodetect_schema=True)  # type: ignore[arg-type]
        assert isinstance(t_resource.columns, dict)
        file_format = (
            "jsonl" if any(col.get("data_type") == "json" for col in t_resource.columns.values()) else "parquet"
        )
        t_resource.apply_hints(file_format=file_format)  # type: ignore[arg-type]
        yield t_resource

if __name__ == "__main__":
    from dlt import Pipeline

    from nxt.config import __config__ as cfg

    pipeline: Pipeline = dlt.pipeline(
        pipeline_name="apply_db",
        destination="bigquery",
        staging="filesystem",
        dataset_name=cfg.apply_db.dataset,
        dev_mode=True,
    )

    source = apply_db(f"{cfg.apply_db.dsn}")
    load_info = pipeline.run(source, write_disposition="append")

    load_info.raise_on_failed_jobs()
    print(load_info)

    last_trace = pipeline.last_trace
    print(last_trace)

output

+ set -a
+ source .env
++ APPLY_DB__DATASET=drnick_apply
++ APPLY_DB__DSN=postgresql+psycopg2://<truncated>@<truncated>/apply
++ DESTINATION__BIGQUERY__LOCATION=europe-west1
++ DESTINATION__BIGQUERY__TRUNCATE_TABLES_ON_STAGING_DESTINATION_BEFORE_LOAD=false
++ DESTINATION__FILESYSTEM__BUCKET_URL=gs://<truncated>/dlt
+ set +a
++ echo nxt_dlt/tmp_apply_db.py
++ sed 's/\//./g'
++ sed 's/\.py$//'
+ module_name=nxt_dlt.tmp_apply_db
+ poetry run python -m nxt_dlt.tmp_apply_db
2024-09-25 18:10:14,530|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,372|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,372|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name applicant_id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,437|[INFO]|123420|124623998454656|dlt|__init__.py|bind:439|Bind incremental on applicant with initial_value: None, start_value: None, end_value: None
2024-09-25 18:10:15,444|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,495|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,495|[WARNING]|123420|124623998454656|dlt|schema_types.py|sqla_col_to_column_schema:132|A column with name applicant_id contains unknown data type UUID which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific.
2024-09-25 18:10:15,611|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db.applicant with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/4c8372df47dd3c7c/1727280615.430451/new_jobs/applicant.4188c90bfb.0.typed-jsonl
2024-09-25 18:10:15,615|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db.application with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/4c8372df47dd3c7c/1727280615.430451/new_jobs/application.032e52f4dc.0.typed-jsonl
2024-09-25 18:10:15,732|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db._dlt_pipeline_state with file None and actual name /home/drnick/.dlt/pipelines/apply_db/normalize/4c8372df47dd3c7c/1727280615.430451/new_jobs/_dlt_pipeline_state.98f9cbbe51.0.typed-jsonl
2024-09-25 18:10:15,811|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:75|Created none pool with 1 workers
2024-09-25 18:10:15,811|[DEBUG]|123420|124623998454656|dlt|pool_runner.py|run_pool:88|Running pool
2024-09-25 18:10:15,811|[INFO]|123420|124623998454656|dlt|normalize.py|run:263|Running file normalizing
2024-09-25 18:10:15,811|[INFO]|123420|124623998454656|dlt|normalize.py|run:266|Found 1 load packages
2024-09-25 18:10:15,825|[INFO]|123420|124623998454656|dlt|normalize.py|run:289|Found 3 files in schema apply_db load_id 1727280615.430451
2024-09-25 18:10:15,826|[INFO]|123420|124623998454656|dlt|normalize.py|spool_schema_files:240|Created new load package 1727280615.430451 on loading volume
2024-09-25 18:10:15,831|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:123|A file format for table _dlt_pipeline_state was specified to preferred in the resource so parquet format being used.
2024-09-25 18:10:15,831|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 18:10:15,831|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727280615.430451/new_jobs/_dlt_pipeline_state.98f9cbbe51.0.typed-jsonl in load_id 1727280615.430451 with table name _dlt_pipeline_state and schema apply_db
2024-09-25 18:10:15,832|[DEBUG]|123420|124623998454656|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727280615.430451/new_jobs/_dlt_pipeline_state.98f9cbbe51.0.typed-jsonl
2024-09-25 18:10:15,832|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:236|Processed file 1727280615.430451/new_jobs/_dlt_pipeline_state.98f9cbbe51.0.typed-jsonl
2024-09-25 18:10:15,832|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:123|A file format for table applicant was specified to parquet in the resource so parquet format being used.
2024-09-25 18:10:15,833|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 18:10:15,833|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727280615.430451/new_jobs/applicant.4188c90bfb.0.typed-jsonl in load_id 1727280615.430451 with table name applicant and schema apply_db
2024-09-25 18:10:15,836|[DEBUG]|123420|124623998454656|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727280615.430451/new_jobs/applicant.4188c90bfb.0.typed-jsonl
2024-09-25 18:10:15,836|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:236|Processed file 1727280615.430451/new_jobs/applicant.4188c90bfb.0.typed-jsonl
2024-09-25 18:10:15,836|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:123|A file format for table application was specified to parquet in the resource so parquet format being used.
2024-09-25 18:10:15,836|[INFO]|123420|124623998454656|dlt|worker.py|_get_items_normalizer:169|Created items normalizer JsonLItemsNormalizer with writer ParquetDataWriter for item format object and file format parquet
2024-09-25 18:10:15,836|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:230|Processing extracted items in 1727280615.430451/new_jobs/application.032e52f4dc.0.typed-jsonl in load_id 1727280615.430451 with table name application and schema apply_db
2024-09-25 18:10:15,839|[DEBUG]|123420|124623998454656|dlt|items_normalizers.py|__call__:208|Processed 1 lines from file 1727280615.430451/new_jobs/application.032e52f4dc.0.typed-jsonl
2024-09-25 18:10:15,839|[DEBUG]|123420|124623998454656|dlt|worker.py|w_normalize_files:236|Processed file 1727280615.430451/new_jobs/application.032e52f4dc.0.typed-jsonl
2024-09-25 18:10:15,839|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db._dlt_pipeline_state with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727280615.430451/new_jobs/_dlt_pipeline_state.61eb001e99.0.parquet
2024-09-25 18:10:15,841|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db.applicant with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727280615.430451/new_jobs/applicant.0776018b8d.0.parquet
2024-09-25 18:10:15,844|[DEBUG]|123420|124623998454656|dlt|data_item_storage.py|close_writers:81|Closing writer for 1727280615.430451.apply_db.application with file None and actual name /home/drnick/.dlt/pipelines/apply_db/load/new/1727280615.430451/new_jobs/application.1c2261d59c.0.parquet
2024-09-25 18:10:15,847|[INFO]|123420|124623998454656|dlt|worker.py|w_normalize_files:244|Processed all items in 3 files
2024-09-25 18:10:16,147|[INFO]|123420|124623998454656|dlt|normalize.py|update_schema:86|Updating schema for table _dlt_pipeline_state with 1 deltas
2024-09-25 18:10:16,147|[INFO]|123420|124623998454656|dlt|normalize.py|update_schema:86|Updating schema for table applicant with 1 deltas
2024-09-25 18:10:16,148|[INFO]|123420|124623998454656|dlt|normalize.py|update_schema:86|Updating schema for table application with 1 deltas
2024-09-25 18:10:16,148|[DEBUG]|123420|124623998454656|dlt|normalize.py|map_parallel:152|0 tasks still remaining for 1727280615.430451...
2024-09-25 18:10:16,148|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:194|Table _dlt_pipeline_state has seen data for a first time with load id 1727280615.430451
2024-09-25 18:10:16,148|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:194|Table applicant has seen data for a first time with load id 1727280615.430451
2024-09-25 18:10:16,149|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:194|Table application has seen data for a first time with load id 1727280615.430451
2024-09-25 18:10:16,150|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:200|Saving schema apply_db with version 1:2
2024-09-25 18:10:16,154|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:216|Committing storage, do not kill this process
2024-09-25 18:10:16,156|[INFO]|123420|124623998454656|dlt|normalize.py|spool_files:222|Extracted package 1727280615.430451 processed
2024-09-25 18:10:16,156|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:101|Closing processing pool
2024-09-25 18:10:16,156|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:104|Processing pool closed
2024-09-25 18:10:17,550|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:75|Created thread pool with 20 workers
2024-09-25 18:10:17,550|[DEBUG]|123420|124623998454656|dlt|pool_runner.py|run_pool:88|Running pool
2024-09-25 18:10:17,550|[INFO]|123420|124623998454656|dlt|load.py|run:612|Running file loading
2024-09-25 18:10:17,551|[INFO]|123420|124623998454656|dlt|load.py|run:615|Found 1 load packages
2024-09-25 18:10:17,551|[INFO]|123420|124623998454656|dlt|load.py|run:621|Loading schema from load package in 1727280615.430451
2024-09-25 18:10:17,559|[INFO]|123420|124623998454656|dlt|load.py|run:623|Loaded schema name apply_db and version 2
2024-09-25 18:10:17,560|[WARNING]|123420|124623998454656|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'birth_date'. in table 'applicant'.
2024-09-25 18:10:17,560|[WARNING]|123420|124623998454656|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'created_at'. in table 'applicant'.
2024-09-25 18:10:17,560|[WARNING]|123420|124623998454656|dlt|type_mapping.py|to_db_datetime_type:56|Column flags for timezone or precision are not yet supported in this destination. One or both of these flags were used in column 'updated_at'. in table 'applicant'.
2024-09-25 18:10:17,561|[INFO]|123420|124623998454656|dlt|utils.py|_init_dataset_and_update_schema:157|Client for bigquery will start initialize storage 
2024-09-25 18:10:20,236|[INFO]|123420|124623998454656|dlt|utils.py|_init_dataset_and_update_schema:175|Client for bigquery will update schema to package schema 
2024-09-25 18:10:20,543|[INFO]|123420|124623998454656|dlt|job_client_impl.py|update_stored_schema:177|Schema with hash gwDhtUuS9ludM6P7mtVF5qPkEb4CL0hmHT92Cmx5lDE= not found in the storage. upgrading
2024-09-25 18:10:20,544|[WARNING]|123420|124623998454656|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: 
SELECT table_name,column_name,data_type,is_nullable
    FROM `<truncated>`.`drnick_apply_20240925041014`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name IN (%s,%s,%s,%s,%s) ORDER BY table_name, ordinal_position;
2024-09-25 18:10:21,158|[INFO]|123420|124623998454656|dlt|job_client_impl.py|_create_table_update:606|Found 15 updates for applicant in apply_db
2024-09-25 18:10:21,159|[INFO]|123420|124623998454656|dlt|job_client_impl.py|_create_table_update:606|Found 6 updates for _dlt_version in apply_db
2024-09-25 18:10:21,160|[INFO]|123420|124623998454656|dlt|job_client_impl.py|_create_table_update:606|Found 5 updates for _dlt_loads in apply_db
2024-09-25 18:10:21,161|[INFO]|123420|124623998454656|dlt|job_client_impl.py|_create_table_update:606|Found 8 updates for _dlt_pipeline_state in apply_db
2024-09-25 18:10:21,162|[INFO]|123420|124623998454656|dlt|job_client_impl.py|_create_table_update:606|Found 7 updates for application in apply_db
2024-09-25 18:10:21,162|[WARNING]|123420|124623998454656|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925041014`.`_dlt_version` (
`version` INT64 NOT NULL,
`engine_version` INT64 NOT NULL,
`inserted_at` TIMESTAMP NOT NULL,
`schema_name` STRING NOT NULL,
`version_hash` STRING NOT NULL,
`schema` STRING NOT NULL);
CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925041014`.`_dlt_loads` (
`load_id` STRING NOT NULL,
`schema_name` STRING ,
`status` INT64 NOT NULL,
`inserted_at` TIMESTAMP NOT NULL,
`schema_version_hash` STRING );
CREATE TABLE IF NOT EXISTS `<truncated>`.`drnick_apply_20240925041014`.`_dlt_pipeline_state` (
`version` INT64 NOT NULL,
`engine_version` INT64 NOT NULL,
`pipeline_name` STRING NOT NULL,
`state` STRING NOT NULL,
`created_at` TIMESTAMP NOT NULL,
`version_hash` STRING ,
`_dlt_load_id` STRING NOT NULL,
`_dlt_id` STRING NOT NULL);
2024-09-25 18:10:22,607|[WARNING]|123420|124623998454656|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: INSERT INTO `<truncated>`.`drnick_apply_20240925041014`.`_dlt_version`(`version`, `engine_version`, `inserted_at`, `schema_name`, `version_hash`, `schema`) VALUES (%s, %s, %s, %s, %s, %s);
2024-09-25 18:10:25,151|[INFO]|123420|124623998454656|dlt|utils.py|_init_dataset_and_update_schema:157|Client for filesystem will start initialize storage 
2024-09-25 18:10:25,448|[INFO]|123420|124623998454656|dlt|utils.py|_init_dataset_and_update_schema:175|Client for filesystem will update schema to package schema 
2024-09-25 18:10:26,151|[INFO]|123420|124623998454656|dlt|load.py|resume_started_jobs:290|Found 0 that are already started and should be continued
2024-09-25 18:10:26,151|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:389|Will complete 0 for 1727280615.430451
2024-09-25 18:10:26,151|[INFO]|123420|124623998454656|dlt|load.py|start_new_jobs:273|Will load additional 3, creating jobs
2024-09-25 18:10:26,155|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/_dlt_pipeline_state.61eb001e99.0.parquet with table name _dlt_pipeline_state
2024-09-25 18:10:26,161|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/applicant.0776018b8d.0.parquet with table name applicant
2024-09-25 18:10:26,165|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/application.1c2261d59c.0.parquet with table name application
2024-09-25 18:10:26,548|[INFO]|123420|124622313883328|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:26,560|[INFO]|123420|124622758479552|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:26,561|[INFO]|123420|124622536181440|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,167|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:389|Will complete 3 for 1727280615.430451
2024-09-25 18:10:27,167|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job _dlt_pipeline_state.61eb001e99.parquet
2024-09-25 18:10:27,171|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,171|[INFO]|123420|124623998454656|dlt|load.py|create_followup_jobs:368|Job _dlt_pipeline_state.61eb001e99.parquet CREATED a new FOLLOWUP JOB /tmp/_dlt_pipeline_state.61eb001e99.0.reference placed in new_jobs
2024-09-25 18:10:27,172|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,172|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for _dlt_pipeline_state.61eb001e99.parquet completed in load 1727280615.430451
2024-09-25 18:10:27,172|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job applicant.0776018b8d.parquet
2024-09-25 18:10:27,176|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,177|[INFO]|123420|124623998454656|dlt|load.py|create_followup_jobs:368|Job applicant.0776018b8d.parquet CREATED a new FOLLOWUP JOB /tmp/applicant.0776018b8d.0.reference placed in new_jobs
2024-09-25 18:10:27,177|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,177|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for applicant.0776018b8d.parquet completed in load 1727280615.430451
2024-09-25 18:10:27,177|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job application.1c2261d59c.parquet
2024-09-25 18:10:27,181|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,182|[INFO]|123420|124623998454656|dlt|load.py|create_followup_jobs:368|Job application.1c2261d59c.parquet CREATED a new FOLLOWUP JOB /tmp/application.1c2261d59c.0.reference placed in new_jobs
2024-09-25 18:10:27,182|[INFO]|123420|124623998454656|dlt|path_utils.py|prepare_datetime_params:112|current_datetime is not set, using timestamp from load package
2024-09-25 18:10:27,182|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for application.1c2261d59c.parquet completed in load 1727280615.430451
2024-09-25 18:10:27,183|[INFO]|123420|124623998454656|dlt|load.py|start_new_jobs:273|Will load additional 3, creating jobs
2024-09-25 18:10:27,183|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/applicant.0776018b8d.0.reference with table name applicant
2024-09-25 18:10:27,184|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/_dlt_pipeline_state.61eb001e99.0.reference with table name _dlt_pipeline_state
2024-09-25 18:10:27,184|[INFO]|123420|124623998454656|dlt|load.py|submit_job:168|Will load file 1727280615.430451/new_jobs/application.1c2261d59c.0.reference with table name application
2024-09-25 18:10:28,185|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:389|Will complete 3 for 1727280615.430451
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job applicant.0776018b8d.reference
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job applicant.0776018b8d.reference still running
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job _dlt_pipeline_state.61eb001e99.reference
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job _dlt_pipeline_state.61eb001e99.reference still running
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job application.1c2261d59c.reference
2024-09-25 18:10:28,185|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job application.1c2261d59c.reference still running
2024-09-25 18:10:28,185|[INFO]|123420|124623998454656|dlt|load.py|start_new_jobs:273|Will load additional 0, creating jobs
2024-09-25 18:10:29,186|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:389|Will complete 3 for 1727280615.430451
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job applicant.0776018b8d.reference
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job applicant.0776018b8d.reference still running
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job _dlt_pipeline_state.61eb001e99.reference
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job _dlt_pipeline_state.61eb001e99.reference still running
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job application.1c2261d59c.reference
2024-09-25 18:10:29,186|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:396|job application.1c2261d59c.reference still running
2024-09-25 18:10:29,186|[INFO]|123420|124623998454656|dlt|load.py|start_new_jobs:273|Will load additional 0, creating jobs
2024-09-25 18:10:30,187|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:389|Will complete 3 for 1727280615.430451
2024-09-25 18:10:30,187|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job applicant.0776018b8d.reference
2024-09-25 18:10:30,189|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for applicant.0776018b8d.reference completed in load 1727280615.430451
2024-09-25 18:10:30,190|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job _dlt_pipeline_state.61eb001e99.reference
2024-09-25 18:10:30,192|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for _dlt_pipeline_state.61eb001e99.reference completed in load 1727280615.430451
2024-09-25 18:10:30,192|[DEBUG]|123420|124623998454656|dlt|load.py|complete_jobs:392|Checking state for job application.1c2261d59c.reference
2024-09-25 18:10:30,192|[INFO]|123420|124623998454656|dlt|load.py|complete_jobs:458|Job for application.1c2261d59c.reference completed in load 1727280615.430451
2024-09-25 18:10:30,193|[INFO]|123420|124623998454656|dlt|load.py|start_new_jobs:273|Will load additional 0, creating jobs
2024-09-25 18:10:30,193|[WARNING]|123420|124623998454656|dlt|sql_client.py|execute_sql:218|TEMP: Executing query: INSERT INTO `<truncated>`.`drnick_apply_20240925041014`.`_dlt_loads`(`load_id`, `schema_name`, `status`, `inserted_at`, `schema_version_hash`) VALUES(%s, %s, %s, %s, %s);
2024-09-25 18:10:32,324|[INFO]|123420|124623998454656|dlt|load.py|complete_package:495|All jobs completed, archiving package 1727280615.430451 with aborted set to False
2024-09-25 18:10:32,324|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:101|Closing processing pool
2024-09-25 18:10:32,325|[INFO]|123420|124623998454656|dlt|pool_runner.py|run_pool:104|Processing pool closed
Pipeline apply_db load step completed in 14.77 seconds
1 load package(s) were loaded to destination bigquery and into dataset drnick_apply_20240925041014
The filesystem staging destination used gs://<truncated>/dlt location to stage data
The bigquery destination used None@<truncated> location to store data
Load package 1727280615.430451 is LOADED and contains no failed jobs
Run started at 2024-09-25 16:10:15.378843+00:00 and COMPLETED in 16.95 seconds with 4 steps.
Step extract COMPLETED in 0.35 seconds.

Load package 1727280615.430451 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.42 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- applicant: 41 row(s)
- application: 41 row(s)

Load package 1727280615.430451 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 16.14 seconds.
Pipeline apply_db load step completed in 14.77 seconds
1 load package(s) were loaded to destination bigquery and into dataset drnick_apply_20240925041014
The filesystem staging destination used gs://<truncated>/dlt location to stage data
The bigquery destination used None@<truncated> location to store data
Load package 1727280615.430451 is LOADED and contains no failed jobs

Step run COMPLETED in 16.95 seconds.
Pipeline apply_db load step completed in 14.77 seconds
1 load package(s) were loaded to destination bigquery and into dataset drnick_apply_20240925041014
The filesystem staging destination used gs://<truncated>/dlt location to stage data
The bigquery destination used None@<truncated> location to store data
Load package 1727280615.430451 is LOADED and contains no failed jobs

Additional information

I can start removing all of the type hinting and bigquery tuning I can try to isolate the change that brought this, but I really need the options that I'm setting :sweat_smile:

neuromantik33 commented 1 day ago

@Pipboyguy Update when I remove the BigQuery auto-detection then everything works fine.

...
bigquery_adapter(t_resource, partition=cursor_path) # autodetect_schema=True)  # type: ignore[arg-type]
...
neuromantik33 commented 1 day ago

And FYI my current ugly workaround is doing my own table detection like so

def missing_destination_tables(pipeline: Pipeline, table_names: Iterable[str]) -> set[str]:
    with pipeline.sql_client() as c:

        def table_exists(table: str) -> bool:
            q_name = c.make_qualified_table_name(table)
            try:
                c.execute_sql(f"SELECT 1 from {q_name}")
                return True
            except DestinationUndefinedEntity:
                return False

        return {table for table in table_names if not table_exists(table)}
...
if __name__ == "__main__":
    from dlt import Pipeline

    from nxt.config import __config__ as cfg

    pipeline: Pipeline = dlt.pipeline(
        pipeline_name="apply_db",
        destination="bigquery",
        staging="filesystem",
        dataset_name=cfg.apply_db.dataset,
    )

    tables_to_append = missing_destination_tables(pipeline, tables_configs.keys())
    print(f"Missing tables: {tables_to_append}")

    source = apply_db(f"{cfg.apply_db.dsn}")
    for t_name, rsrc in source.resources.items():
        if t_name not in tables_to_append and tables_configs[t_name].get("cursor_path") is None:
            rsrc.apply_hints(write_disposition="merge")

    load_info = pipeline.run(source)