dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.56k stars 172 forks source link

Replace without cursor then merge with cursor fails on Clickhouse #1972

Open filipesilva opened 2 weeks ago

filipesilva commented 2 weeks ago

dlt version

1.2.0

Describe the problem

On Clickhouse, If I switch to write_disposition="merge" and add a cursor after the pipeline was ran with write_disposition="replace" and no cursor, the run will fail with the following error:

dlt.destinations.sql_jobs.SqlJobCreationException: Could not create SQLFollowupJob with exception Merge sql job for dataset name `my_database`.`repro_dataset`, staging dataset name `my_database`.`repro_dataset_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['jobs']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key` or `unique` column (e.g. `_dlt_id`) in table `jobs`.. Table chain: - columns:
    number:
      name: number
      nullable: false
      data_type: bigint
      primary_key: true
    name:
      name: name
      nullable: true
      data_type: text
  write_disposition: merge
  name: jobs
  resource: jobs
  x-normalizer:
    seen-data: true

After this failure mode, subsequent runs will log The pipelinerunmethod will now load the pending load packages. The data you passed to the run function will not be loaded. In order to do that you must run the pipeline again and attempt to re-run the failed pipeline, failing indefinitely until I run rm -rf ~/.dlt/pipelines/.

This does not happen with postgres, and it does not happen if the initial write_disposition="replace" was ran with a cursor.

Expected behavior

I expect to be able to add a cursor and change write disposition on clickhouse in the same way it is possible with postgres.

Steps to reproduce

Clone and install dependencies for https://github.com/filipesilva/dlt-replace-merge-bug

# terminal 1
git clone https://github.com/filipesilva/dlt-replace-merge-bug
cd dlt-replace-merge-bug
python3 -m venv ./env
source .env/bin/activate

In separate terminal windows, run the following scripts to launch clickhouse and postgres docker containers:

# terminal 2
./scripts/start-postgres.sh
# terminal 3
./scripts/start-clickhouse.sh

Go back to the first terminal to and run the postgres pipeline

# terminal 1
python sql_database_pipeline.py postgres

You should see

[I] (.env) filipesilva@Filipes-MacBook-Pro ~/s/dlt-replace-merge-bug (master)> python sql_database_pipeline.py postgres

=== Replace with cursor then merge with cursor ===

Pipeline repro_pipeline load step completed in 1.08 seconds
1 load package(s) were loaded to destination postgres and into dataset repro_dataset
The postgres destination used postgresql://postgres:***@localhost:5432/postgres location to store data
Load package 1729522139.2506921 is LOADED and contains no failed jobs
Pipeline repro_pipeline load step completed in ---
0 load package(s) were loaded to destination postgres and into dataset None
The postgres destination used postgresql://postgres:***@localhost:5432/postgres location to store data

=== Replace without cursor then merge with cursor ===

Pipeline repro_pipeline load step completed in 1.08 seconds
1 load package(s) were loaded to destination postgres and into dataset repro_dataset
The postgres destination used postgresql://postgres:***@localhost:5432/postgres location to store data
Load package 1729522140.885526 is LOADED and contains no failed jobs
Pipeline repro_pipeline load step completed in 2.09 seconds
1 load package(s) were loaded to destination postgres and into dataset repro_dataset
The postgres destination used postgresql://postgres:***@localhost:5432/postgres location to store data
Load package 1729522142.442627 is LOADED and contains no failed jobs

But if you run the clickhouse pipeline, it will error out

# terminal 1
python sql_database_pipeline.py clickhouse
[I] (.env) filipesilva@Filipes-MacBook-Pro ~/s/dlt-replace-merge-bug (master)> python sql_database_pipeline.py clickhouse
2024-10-21 15:49:11,040|[WARNING]|48796|8290169408|dlt|pipeline.py|_state_to_props:1603|The destination dlt.destinations.postgres:postgres in state differs from destination dlt.destinations.clickhouse:clickhouse in pipeline and will be ignored

=== Replace with cursor then merge with cursor ===

Pipeline repro_pipeline load step completed in 1.06 seconds
1 load package(s) were loaded to destination clickhouse and into dataset repro_dataset
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data
Load package 1729522151.1919942 is LOADED and contains no failed jobs
Pipeline repro_pipeline load step completed in ---
0 load package(s) were loaded to destination clickhouse and into dataset None
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data

=== Replace without cursor then merge with cursor ===

Pipeline repro_pipeline load step completed in 1.06 seconds
1 load package(s) were loaded to destination clickhouse and into dataset repro_dataset
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data
Load package 1729522152.812721 is LOADED and contains no failed jobs
Traceback (most recent call last):
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 76, in from_table_chain
    for stmt in cls.generate_sql(table_chain, sql_client, params)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 172, in generate_sql
    return cls.gen_merge_sql(table_chain, sql_client)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 538, in gen_merge_sql
    cls._get_row_key_col(table_chain, sql_client, root_table)
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 429, in _get_row_key_col
    col = cls._get_prop_col_or_raise(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 473, in _get_prop_col_or_raise
    raise exception
dlt.destinations.exceptions.MergeDispositionException: Merge sql job for dataset name `my_database`.`repro_dataset`, staging dataset name `my_database`.`repro_dataset_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['jobs']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key` or `unique` column (e.g. `_dlt_id`) in table `jobs`.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/load/load.py", line 349, in create_followup_jobs
    if follow_up_jobs := client.create_table_chain_completed_followup_jobs(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/job_client_impl.py", line 255, in create_table_chain_completed_followup_jobs
    jobs.extend(self._create_merge_followup_jobs(table_chain))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/impl/clickhouse/clickhouse.py", line 227, in _create_merge_followup_jobs
    return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/destinations/sql_jobs.py", line 82, in from_table_chain
    raise SqlJobCreationException(e, table_chain) from e
dlt.destinations.sql_jobs.SqlJobCreationException: Could not create SQLFollowupJob with exception Merge sql job for dataset name `my_database`.`repro_dataset`, staging dataset name `my_database`.`repro_dataset_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['jobs']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key` or `unique` column (e.g. `_dlt_id`) in table `jobs`.. Table chain: - columns:
    number:
      name: number
      nullable: false
      data_type: bigint
      primary_key: true
    name:
      name: name
      nullable: true
      data_type: text
  write_disposition: merge
  name: jobs
  resource: jobs
  x-normalizer:
    seen-data: true

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 576, in load
    runner.run_pool(load_step.config, load_step)
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/common/runners/pool_runner.py", line 89, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/common/runners/pool_runner.py", line 82, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/load/load.py", line 638, in run
    self.load_single_package(load_id, schema)
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/load/load.py", line 571, in load_single_package
    running_jobs, finalized_jobs, new_pending_exception = self.complete_jobs(
                                                          ^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/load/load.py", line 446, in complete_jobs
    self.create_followup_jobs(load_id, state, job, schema)
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/load/load.py", line 354, in create_followup_jobs
    raise TableChainFollowupJobCreationFailedException(
dlt.load.exceptions.TableChainFollowupJobCreationFailedException: Failed creating table chain followup jobs for table chain with root table jobs.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/sql_database_pipeline.py", line 50, in <module>
    info = pipeline.run(source, write_disposition="merge")
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 223, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 272, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 715, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 223, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 163, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 272, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/filipesilva/sandbox/dlt-replace-merge-bug/.env/lib/python3.12/site-packages/dlt/pipeline/pipeline.py", line 583, in load
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1729522154.360807 with exception:

<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table jobs.

After this failure mode, subsequent runs will log The pipelinerunmethod will now load the pending load packages. The data you passed to the run function will not be loaded. In order to do that you must run the pipeline again and attempt to re-run the failed pipeline, failing indefinitely until I run rm -rf ~/.dlt/pipelines/. You can run this via ./scripts/clear-pending-pipelines.sh.

Operating system

macOS

Runtime environment

Local

Python version

3.11

dlt data source

SQLite

dlt destination

No response

Other deployment details

I'm using Clickhouse as a destination, and Python 3.12.5 but they do not appear on the bug issue dropdowns.

Additional information

No response

rudolfix commented 2 weeks ago

@filipesilva thanks for the repro! this makes fixing bug easier. on the first look something really weird happens - on clickhouse, code that handles nested tables is used to generate merge sql. there should be no nested tables because you use arrow backend which should produce a single table.

I'll take a look.

in the meantime you may try to force-add _dlt_id to your arrow tables: https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas#add-_dlt_load_id-and-_dlt_id-to-your-tables

and see what tables you get in clickhouse at the end.

filipesilva commented 6 days ago

Hi @rudolfix, thanks for the quick reply! Sorry for only getting back to you now, I was on vacation last week.

I added this to .dlt/config.toml:

[normalize.parquet_normalizer]
add_dlt_load_id = true
add_dlt_id = true

then ran the clickhouse repro again

[I] (.env) filipesilva@Filipes-MBP ~/s/dlt-replace-merge-bug (master)> python sql_database_pipeline.py clickhouse

=== Replace with cursor then merge with cursor ===

2024-10-29 12:31:18,130|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) ['`_dlt_id`'] with hint row_key are being added to existing table jobs. Several hint types may not be added to existing tables.
2024-10-29 12:31:18,131|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) [] with hint nullable are being added to existing table jobs. Several hint types may not be added to existing tables.
2024-10-29 12:31:18,131|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) ['`_dlt_id`'] with hint unique are being added to existing table jobs. Several hint types may not be added to existing tables.
Pipeline repro_pipeline load step completed in 1.08 seconds
1 load package(s) were loaded to destination clickhouse and into dataset repro_dataset
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data
Load package 1730205077.668959 is LOADED and contains no failed jobs
Pipeline repro_pipeline load step completed in ---
0 load package(s) were loaded to destination clickhouse and into dataset None
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data

=== Replace without cursor then merge with cursor ===

Pipeline repro_pipeline load step completed in 1.07 seconds
1 load package(s) were loaded to destination clickhouse and into dataset repro_dataset
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data
Load package 1730205079.340509 is LOADED and contains no failed jobs
2024-10-29 12:31:21,372|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) ['`_dlt_id`'] with hint row_key are being added to existing table jobs. Several hint types may not be added to existing tables.
2024-10-29 12:31:21,372|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) [] with hint nullable are being added to existing table jobs. Several hint types may not be added to existing tables.
2024-10-29 12:31:21,372|[WARNING]|9571|8176547904|dlt|job_client_impl.py|_check_table_update_hints:583|Column(s) ['`_dlt_id`'] with hint unique are being added to existing table jobs. Several hint types may not be added to existing tables.
Pipeline repro_pipeline load step completed in 2.13 seconds
1 load package(s) were loaded to destination clickhouse and into dataset repro_dataset
The clickhouse destination used clickhouse://username:***@localhost:9000/my_database location to store data
Load package 1730205080.8826492 is LOADED and contains no failed jobs

So indeed it does finish successfully now.

On clickhouse, the result table has these fields and data:

[I] (.env) filipesilva@Filipes-MBP ~/s/dlt-replace-merge-bug (master)> echo "DESCRIBE TABLE my_database.repro_dataset___jobs" | curl 'username:password@localhost:8123/?query=' -s --data-binary @-
number  Int64
name    Nullable(String)
_dlt_load_id    String
_dlt_id String
[I] (.env) filipesilva@Filipes-MBP ~/s/dlt-replace-merge-bug (master)> echo "SELECT * FROM my_database.repro_dataset___jobs" | curl 'username:password@localhost:8123/?query=' -s --data-binary @-
1   foo 1730205089.189604   R3k0zo69UOG+Ew
2   bar 1730205089.189604   CmfC6CodtnoaPA

Is this the resolution or is it more of a workaround? Ideally the destination table would not end up with the dlt fields.