ClickHouse / clickhouse-connect

Python driver/sqlalchemy/superset connectors
Apache License 2.0
332 stars 65 forks source link

multi-threaded insert_df cause the whole table be replaced and filled with millions of same existed rows #392

Closed KirigiriSuzumiya closed 2 months ago

KirigiriSuzumiya commented 2 months ago

Describe the bug

multi-threaded insert_df cause the whole table be replaced and filled with millions of same existed rows,

Steps to reproduce

  1. start mutli threads to insert data
  2. only 2 concurrent threads can reproduce the issue
  3. it works well when only one of the threads is inserting, while other threads are doing other operation

Expected behaviour

insert rows correctly

Code example

import clickhouse_connect

def get_client():
    # clickhouse_connect.common.set_setting('autogenerate_session_id', False)
    # I have tried code above, but it not work
    client = clickhouse_connect.get_client(
        host=CHCONFIG["host"],
        username=CHCONFIG["username"],
        password=CHCONFIG["password"] if CHCONFIG["password"] else "",
        port=CHCONFIG["port"],
        database=CHCONFIG["database"],
    )
    return client

def write_recorder_clickhouse(df, host):
    tablename = get_md5_hash(host)
    # only update rows with latest date
    df["date"] = df["date"].apply(lambda x: x.replace(tzinfo=None))
    df["date"] = pd.to_datetime(df["date"])
    df_sorted = df.sort_values(by="date", ascending=False)
    max_date_index = df_sorted["date"].idxmax()
    latest_records = df_sorted[
        df_sorted["date"] == df_sorted.loc[max_date_index, "date"]
    ]
    latest_records["frames_url"].apply(serialize_frameurl)
    latest_records["suggestions"].apply(serialize_frameurl)
    client = get_client()
    if not check_table_exist(client, tablename):
        create_table(client, tablename)
    insert_data(
        client,
        tablename,
        df,
        column_type_names=[
            some_dtypes
        ],
    )

# something like this, only 2 concurrent threads may cause the issue
for i in rang(xxxx):
  threading.Thread(
                  target=write_recorder_clickhouse, args=[some_df, table_name]
              ).start()

clickhouse-connect and/or ClickHouse server logs

no exception rasised, until huge amount of rows make clickhouse crash

Configuration

Environment

ClickHouse server

KirigiriSuzumiya commented 2 months ago

solved, some of my param is not thread safe