ClickHouse / dbt-clickhouse

The Clickhouse plugin for dbt (data build tool)
Apache License 2.0
254 stars 113 forks source link

`distributed_table` materialization doesn't guranatee idempotency (in respect of table structure) #333

Open rightx2 opened 3 months ago

rightx2 commented 3 months ago

Describe the bug and step to reproduce:

In dbt, default behavior of table materialization is to drop and recreate the table. If I run the same dbt model twice, the second run will drop the table and recreate it. Unfortunately, this is not the case for distributed_table materialization. Here is how I tested it:

1. Model sql file

{{
    config(
        materialized='distributed_table',
        order_by=['code'],
        sharding_key='cityHash64(code)',
        engine="ReplicatedReplacingMergeTree(created_at)",
        settings = { 'index_granularity': 8192},
    )
}}

-- just sample data
select 
    CAST('A000070' AS String) as code,
    now() AS created_at

2. First dbt run

Even thougth the model query above is simple, there are many queries run in the background. I attached some impiortant steps happend in background:

.
.
.

-- local table
create table `dp_rankingdb_dbt_dev`.`jw_local` ON CLUSTER "dp" (
    code String, created_at DateTime
)
engine = ReplicatedReplacingMergeTree(created_at)
order by (code)
SETTINGS index_granularity=8192

-- distributed table
create or replace table `dp_rankingdb_dbt_dev`.`jw` ON CLUSTER "dp" as `dp_rankingdb_dbt_dev`.`jw_local`
ENGINE = Distributed('dp', 'dp_rankingdb_dbt_dev', 'jw_local', cityHash64(code))

-- insert data
insert into `dp_rankingdb_dbt_dev`.`jw`
("code", "created_at")
select
    CAST('A000070' AS String) as code,
    now() AS created_at

.
.
.

3. Result of the first run

Result of SHOW CREATE TABLE dp_rankingdb_dbt_dev.jw_local is what I expected and meant to be:

CREATE TABLE dp_rankingdb_dbt_dev.jw_local
(
    `code` String,
    `created_at` DateTime
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/dp_rankingdb_dbt_dev/jw_local', 
    '{replica}', created_at
)
ORDER BY code
SETTINGS index_granularity = 8192    

Of course, Result of SHOW CREATE TABLE dp_rankingdb_dbt_dev.jw is also what I expected as well.

4. Second dbt run

In this time, queries executed in background a little bit different:

.
.
.
create table `dp_rankingdb_dbt_dev`.`jw_local__dbt_backup` ON CLUSTER "dp" (
    code String, created_at DateTime
)
engine = ReplicatedReplacingMergeTree(created_at)
order by (code)
SETTINGS index_granularity=8192

SYSTEM SYNC REPLICA ON CLUSTER "dp" dp_rankingdb_dbt_dev.jw_local

EXCHANGE TABLES `dp_rankingdb_dbt_dev`.`jw_local__dbt_backup` AND `dp_rankingdb_dbt_dev`.`jw_local` ON CLUSTER "dp"

create or replace table `dp_rankingdb_dbt_dev`.`jw` ON CLUSTER "dp" as `dp_rankingdb_dbt_dev`.`jw_local`
ENGINE = Distributed('dp', 'dp_rankingdb_dbt_dev', 'jw_local', cityHash64(code))

insert into `dp_rankingdb_dbt_dev`.`jw`
    ("code", "created_at")
select
    CAST('A000070' AS String) as code,
    now() AS created_at

drop table if exists `dp_rankingdb_dbt_dev`.`jw_local__dbt_backup` ON CLUSTER "dp" SYNC    
.
.
.

It run successfully but the result of SHOW CREATE TABLE dp_rankingdb_dbt_dev.jw_local is not what I expected:

CREATE TABLE dp_rankingdb_dbt_dev.jw_local
(
    `code` String,
    `created_at` DateTime
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/dp_rankingdb_dbt_dev/jw_local__dbt_backup', 
    '{replica}', 
    created_at
)
ORDER BY code
SETTINGS index_granularity = 8192

As you can see here, the path of the table is changed to dp_rankingdb_dbt_dev.jw_local__dbt_backup!! And I found out that this happend because of the EXCHANGE TABLES .. query above.

Note: If you manually run(not via dbt run) SHOW CREATE TABLE dp_rankingdb_dbt_dev.jw_local__dbt_backup before drop table if exists dp_rankingdb_dbt_dev.jw_local__dbt_backup ON CLUSTER "dp" SYNC, you will see that the ReplicatedReplacingMergeTree path will be jw_local instead of jw_local__dbt_backup. They are literally the EXCHANGED!

You might ask, so what? ... this lead to a problem in the next run(3rd dbt run) because dbt run will invoke below query again in next run:


create table `dp_rankingdb_dbt_dev`.`jw_local__dbt_backup` ON CLUSTER "dp" (
    code String, created_at DateTime
)
engine = ReplicatedReplacingMergeTree(created_at)
order by (code)
SETTINGS index_granularity=8192

.
.
.

and it raised error:

08:05:06  Finished running 1 distributed_table model in 0 hours 0 minutes and 4.74 seconds (4.74s).
08:05:06
08:05:06  Completed with 1 error and 0 warnings:
08:05:06
08:05:06    Database Error in model jw (models\dp_rankingdb\jw.sql)
  HTTPDriver for http://analysis-clickhouse.live.abcd.bz:8123 returned response code 500)
   Code: 253. DB::Exception: There was an error on [dc1-pr-clickhouse03.abcd.bz:9000]: Code: 253. DB::Exception: Replica /clickhouse/tables/clickhouse02/dp_rankingdb_dbt_dev/jw_local__dbt_backup/replicas/clickhouse03 already exists. (REPLICA_ALREADY_EXISTS) (version 24.3.2.23 (official build)). (REPLICA_ALREADY_EXISTS) (version 24.3.2.23 (official build))

This happend because drop table if exists dp_rankingdb_dbt_dev.jw_local__dbt_backup ON CLUSTER "dp" SYNC delete jw_local instead of jw_local__dbt_backup (because they are interchanged!) so path of jw_local__dbt_backupstill exists when dbt try to create dp_rankingdb_dbt_dev.jw_local__dbt_backup again...

I checked the codes in dbt/include/clickhouse/macros/materializations/distributed_table.sql and found out that this is somewhat related with backup table creation, but I think this kind of behavior is not well aligned with the default behavior of materialization in dbt..

Configuration

Environment

ClickHouse server

antonaut commented 2 months ago

Have a similar issue, but using ReplicatedMergeTree instead of Distributed