ClickHouse / dbt-clickhouse

The Clickhouse plugin for dbt (data build tool)
Apache License 2.0
241 stars 98 forks source link

Update snapshots on cluster with replica #183

Open ikeniborn opened 1 year ago

ikeniborn commented 1 year ago

Describe the bug

Steps to reproduce

  1. run model snapshots first time. All good
  2. {% snapshot snap_dim_fund %}

    {{ config( target_schema='snapshots', tags = ["dimension"], unique_key="fund_id", strategy='check', check_cols=[ 'row_hash', ], engine = "ReplicatedMergeTree ('/clickhouse/tables/{shard}/{database}/{table}','{replica}')", invalidate_hard_deletes=true, order_by ='dbt_scd_id', pre_hook = [ "set max_bytes_before_external_group_by=1000000000, max_bytes_before_external_sort=500000000, join_algorithm = 'auto', join_on_disk_max_files_to_merge = 2, partial_merge_join_optimizations = 1, partial_merge_join_rows_in_right_blocks = 1000, max_bytes_in_join=1000000000", "drop table if exists {{this.name}}__dbt_tmp on cluster '{cluster}'", ], ) }}

    with stage_1 as ( SELECT key :: String as fund_id, _id as mongo_id, upper(cryptorank_fund_name) as fund_name, cryptorank_fund_tier as fund_tier, '' as fund_category, cryptorank_fund_id as fund_cryptorank_id, cryptorank_fund_logo as fund_logo, cryptorank_fund_link_web as fund_link_web, cryptorank_fund_link_linkedin as fund_link_linkedin from {{ ref('mongodb_dim_funds') }} where notEmpty(fund_id) ) select fund_id ,mongo_id ,fund_name ,fund_tier ,fund_category ,fund_cryptorank_id ,fund_logo ,fund_link_web ,fund_link_linkedin ,cityHash64( assumeNotNull(fund_name) ,assumeNotNull(fund_tier) ,assumeNotNull(fund_category) ,assumeNotNull(fund_cryptorank_id) ,assumeNotNull(fund_logo) ,assumeNotNull(fund_link_web) ,assumeNotNull(fund_link_linkedin) ) as row_hash ,now() updated_dttm from stage_1

{% endsnapshot %}

  1. Second time i get error
  2. 10:27:58 Completed with 1 error and 0 warnings: 10:27:58
    10:27:58 Database Error in snapshot snap_dim_fund (snapshots/snap_dim_fund.sql) 10:27:58 Code: 62. 10:27:58 DB::Exception: Syntax error: failed at position 226 (')') (line 8, col 45): ) 10:27:58 with snapshot_query as ( 10:27:58
    10:27:58
    10:27:58
    10:27:58
    10:27:58
    10:27:58 with stage_1 as ( 10:27:58 SELECT 10:27:58 key :: String as fund_id, 10:27:58 _id as mongo_id, 10:27:58 upper(cryptorank_f. Expected one of: list of elements, insert element, COLUMNS matcher, COLUMNS, qualified asterisk, compound identifier, identifier, asterisk. Stack trace: 10:27:58
    10:27:58 0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000e91fc37 in /usr/bin/clickhouse 10:27:58 1. ? @ 0x000000000932628d in /usr/bin/clickhouse 10:27:58 2. DB::parseQueryAndMovePosition(DB::IParser&, char const&, char const, String const&, bool, unsigned long, unsigned long) @ 0x0000000015a3939c in /usr/bin/clickhouse 10:27:58 3. ? @ 0x00000000144272ad in /usr/bin/clickhouse 10:27:58 4. DB::executeQuery(String const&, std::shared_ptr, bool, DB::QueryProcessingStage::Enum) @ 0x00000000144267ce in /usr/bin/clickhouse 10:27:58 5. DB::TCPHandler::runImpl() @ 0x0000000015278ac4 in /usr/bin/clickhouse 10:27:58 6. DB::TCPHandler::run() @ 0x000000001528f879 in /usr/bin/clickhouse 10:27:58 7. Poco::Net::TCPServerConnection::start() @ 0x0000000018294134 in /usr/bin/clickhouse 10:27:58 8. Poco::Net::TCPServerDispatcher::run() @ 0x0000000018295351 in /usr/bin/clickhouse 10:27:58 9. Poco::PooledThread::run() @ 0x000000001841e567 in /usr/bin/clickhouse 10:27:58 10. Poco::ThreadImpl::runnableEntry(void*) @ 0x000000001841bf9c in /usr/bin/clickhouse 10:27:58 11. ? @ 0x00007fa1ad69f802 in ? 10:27:58 12. ? @ 0x00007fa1ad63f450 in ? 10:27:58
    10:27:58 Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

because when try get column

select name, type from system.columns where table = 'snap_dim_fund__dbt_tmp'
order by position

in database i get infomation from select

dbt don't get information for insert and try insert without column

    insert into snap_dim_fund__dbt_tmp ()

with snapshot_query as ( with stage_1 as ( SELECT key :: String as fund_id, _id as mongo_id, upper(cryptorank_fund_name) as fund_name, cryptorank_fund_tier as fund_tier, '' as fund_category, cryptorank_fund_id as fund_cryptorank_id, cryptorank_fund_logo as fund_logo, cryptorank_fund_link_web as fund_link_web, cryptorank_fund_link_linkedin as fund_link_linkedin from mongodb.mongodb_dim_funds where notEmpty(fund_id) ).......

Expected behaviour

Code examples, such as models or profile settings

dbt and/or ClickHouse server logs

Configuration

Environment

Core:

Plugins:

clickhouse: target: default outputs: default: driver: native type: clickhouse schema: default user: "{{ env_var('DBT_ENV_SECRET_USER') }}" password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}"

optional fields

  port: 9000
  host: "{{ env_var('DBT_ENV_SECRET_HOST') }}"
  verify: False
  secure: False
  connect_timeout: 60
  # compression: 'gzip'
  threads: 8
  send_receive_timeout: 100000
  check_exchange: True
  cluster: cluster
  cluster_mode: True
  # Native (clickhouse-driver) connection settings
  sync_request_timeout: 5
  compress_block_size: 1048576
  use_lw_deletes: True

ClickHouse server

ClickHouse 23.7.4.5

dstsimokha commented 2 months ago

@genzgd Hi! I've hardcoded ON CLUSTER 'cluster_name' in clickhouse__snapshot_merge_sql macro in three places and it works just fine: https://github.com/ClickHouse/dbt-clickhouse/blob/506bd18fb063eedf681529eae5a62dcd5c8e3148/dbt/include/clickhouse/macros/materializations/snapshot.sql#L30 https://github.com/ClickHouse/dbt-clickhouse/blob/506bd18fb063eedf681529eae5a62dcd5c8e3148/dbt/include/clickhouse/macros/materializations/snapshot.sql#L76 https://github.com/ClickHouse/dbt-clickhouse/blob/506bd18fb063eedf681529eae5a62dcd5c8e3148/dbt/include/clickhouse/macros/materializations/snapshot.sql#L80 Though not sure how to add it there not as hardcode