ClickHouse / clickhouse-connect

Python driver/sqlalchemy/superset connectors
Apache License 2.0
281 stars 59 forks source link

Date conversion when inserted #300

Closed OnlyImmutable closed 4 months ago

OnlyImmutable commented 4 months ago

Describe the bug

I have written a script to insert all my data from Postgres to Clickhouse, that being said, all data inserts fine usually, but when I try to insert a date, I get an exception within the clickhouse-connect library in Python.

Expected behaviour

The data inserts fine without the library throwing an exception...

Code example

import clickhouse_connect

async def insert_data_into_clickhouse(db, table_name, column_names, select_batch_size=10000):
    # Get the total number of rows
    total_rows_query = f"SELECT COUNT(*) FROM {table_name.lower()};"
    total_rows = (await db.execute(text(total_rows_query))).scalar()

    for offset in range(0, total_rows, select_batch_size):
        select_query = f"SELECT * FROM {table_name.lower()} OFFSET {offset} LIMIT {select_batch_size};"
        results = (await db.execute(text(select_query))).fetchall()

        print("Offset:", offset, "Batch Size:", len(results))

        # Prepare the data for insertion
        data_values = []

        for row in results:
            # Handle None values for nullable columns and convert date objects to strings
            escaped_row = []
            for value in row:
                if value is None:
                    escaped_row.append(None)
                elif isinstance(value, (datetime.datetime, datetime.date)):
                    escaped_row.append(value.strftime("%Y-%m-%d"))
                elif isinstance(value, str) and value:
                    # Handle date that is passed as string
                    try:
                        # Try to parse the string as a date
                        parsed_date = parser.parse(value).date()
                        escaped_row.append(parsed_date.strftime("%Y-%m-%d"))
                    except ValueError:
                        # If parsing fails, you can handle it as needed.
                        # Here, we are appending the original string.
                        escaped_row.append(str(value))
                else:
                    escaped_row.append(value)

            # print("Original Row:", row)
            # print("Escaped Row:", escaped_row)
            data_values.append(escaped_row)

            # Print information for debugging
            # print("Column Names:", column_names)
            # print("Data Values:", data_values)

        # Use the insert method with column_names parameter
        # The exception happens once the `data_values` is passed and insert is called, not before.
        client.insert(table=table_name, data=data_values, column_names=column_names)
        print(f"Inserted into {table_name}... batch offset: {offset}... batch size: {len(results)}")
        print()

    print("Data insertion completed.")

Here is an example of the data_values before insert...

[['NCT06203795', None, '2023-12-07', None, None, '2024-01-02', '2024-01-02', '2024-01-12', 'Actual', None, None, None, None, None, None, '2024-01-02', '2024-01-12', 'Actual', '2023-10-02', 'Actual', '2023-10-02', '2024-01-11', '2024-01-31', '2024-08-31', 'Anticipated', '2024-08-31', '2023-10-26', 'Actual', '2023-10-26', None, 'Interventional', 'PATENCY', None, 'Dialysis Performance of the FX CorAL Membrane', 'Performance of Toxin Removal and Clotting Kinetics in Dialyzers', 'Active, not recruiting', None, 'Not Applicable', 10, 'Actual', 'University Hospital, Ghent', None, 4, None, None, False, None, None, None, False, False, False, None, None, None, None, None, None, None, None, 'No', None, '2024-02-04', '2024-02-04', 'OTHER', None, None, None, None, None]]

clickhouse-connect and/or ClickHouse server logs

Configuration

Environment

 warnings.warn(
Fetching and creating tables in ClickHouse...
[('nct_id', 'character varying', False, False, None, True), ('nlm_download_date_description', 'character varying', False, False, None, True), ('study_first_submitted_date', 'date', False, False, None, True), ('results_first_submitted_date', 'date', False, False, None, True), ('disposition_first_submitted_date', 'date', False, False, None, True), ('last_update_submitted_date', 'date', False, False, None, True), ('study_first_submitted_qc_date', 'date', False, False, None, True), ('study_first_posted_date', 'date', False, False, None, True), ('study_first_posted_date_type', 'character varying', False, False, None, True), ('results_first_submitted_qc_date', 'date', False, False, None, True), ('results_first_posted_date', 'date', False, False, None, True), ('results_first_posted_date_type', 'character varying', False, False, None, True), ('disposition_first_submitted_qc_date', 'date', False, False, None, True), ('disposition_first_posted_date', 'date', False, False, None, True), ('disposition_first_posted_date_type', 'character varying', False, False, None, True), ('last_update_submitted_qc_date', 'date', False, False, None, True), ('last_update_posted_date', 'date', False, False, None, True), ('last_update_posted_date_type', 'character varying', False, False, None, True), ('start_month_year', 'character varying', False, False, None, True), ('start_date_type', 'character varying', False, False, None, True), ('start_date', 'date', False, False, None, True), ('verification_month_year', 'character varying', False, False, None, True), ('verification_date', 'date', False, False, None, True), ('completion_month_year', 'character varying', False, False, None, True), ('completion_date_type', 'character varying', False, False, None, True), ('completion_date', 'date', False, False, None, True), ('primary_completion_month_year', 'character varying', False, False, None, True), ('primary_completion_date_type', 'character varying', False, False, None, True), ('primary_completion_date', 'date', False, False, None, True), ('target_duration', 'character varying', False, False, None, True), ('study_type', 'character varying', False, False, None, True), ('acronym', 'character varying', False, False, None, True), ('baseline_population', 'text', False, False, None, True), ('brief_title', 'text', False, False, None, True), ('official_title', 'text', False, False, None, True), ('overall_status', 'character varying', False, False, None, True), ('last_known_status', 'character varying', False, False, None, True), ('phase', 'character varying', False, False, None, True), ('enrollment', 'integer', False, False, None, True), ('enrollment_type', 'character varying', False, False, None, True), ('source', 'character varying', False, False, None, True), ('limitations_and_caveats', 'character varying', False, False, None, True), ('number_of_arms', 'integer', False, False, None, True), ('number_of_groups', 'integer', False, False, None, True), ('why_stopped', 'character varying', False, False, None, True), ('has_expanded_access', 'boolean', False, False, None, True), ('expanded_access_type_individual', 'boolean', False, False, None, True), ('expanded_access_type_intermediate', 'boolean', False, False, None, True), ('expanded_access_type_treatment', 'boolean', False, False, None, True), ('has_dmc', 'boolean', False, False, None, True), ('is_fda_regulated_drug', 'boolean', False, False, None, True), ('is_fda_regulated_device', 'boolean', False, False, None, True), ('is_unapproved_device', 'boolean', False, False, None, True), ('is_ppsd', 'boolean', False, False, None, True), ('is_us_export', 'boolean', False, False, None, True), ('biospec_retention', 'character varying', False, False, None, True), ('biospec_description', 'text', False, False, None, True), ('ipd_time_frame', 'character varying', False, False, None, True), ('ipd_access_criteria', 'character varying', False, False, None, True), ('ipd_url', 'character varying', False, False, None, True), ('plan_to_share_ipd', 'character varying', False, False, None, True), ('plan_to_share_ipd_description', 'character varying', False, False, None, True), ('created_at', 'timestamp without time zone', False, False, None, False), ('updated_at', 'timestamp without time zone', False, False, None, False), ('source_class', 'character varying', False, False, None, True), ('delayed_posting', 'character varying', False, False, None, True), ('expanded_access_nctid', 'character varying', False, False, None, True), ('expanded_access_status_for_nctid', 'character varying', False, False, None, True), ('fdaaa801_violation', 'boolean', False, False, None, True), ('baseline_type_units_analyzed', 'character varying', False, False, None, True)]
[('nct_id', ('String', 255), False), ('nlm_download_date_description', ('String', 255), False), ('study_first_submitted_date', 'Date', False), ('results_first_submitted_date', 'Date', False), ('disposition_first_submitted_date', 'Date', False), ('last_update_submitted_date', 'Date', False), ('study_first_submitted_qc_date', 'Date', False), ('study_first_posted_date', 'Date', False), ('study_first_posted_date_type', ('String', 255), False), ('results_first_submitted_qc_date', 'Date', False), ('results_first_posted_date', 'Date', False), ('results_first_posted_date_type', ('String', 255), False), ('disposition_first_submitted_qc_date', 'Date', False), ('disposition_first_posted_date', 'Date', False), ('disposition_first_posted_date_type', ('String', 255), False), ('last_update_submitted_qc_date', 'Date', False), ('last_update_posted_date', 'Date', False), ('last_update_posted_date_type', ('String', 255), False), ('start_month_year', ('String', 255), False), ('start_date_type', ('String', 255), False), ('start_date', 'Date', False), ('verification_month_year', ('String', 255), False), ('verification_date', 'Date', False), ('completion_month_year', ('String', 255), False), ('completion_date_type', ('String', 255), False), ('completion_date', 'Date', False), ('primary_completion_month_year', ('String', 255), False), ('primary_completion_date_type', ('String', 255), False), ('primary_completion_date', 'Date', False), ('target_duration', ('String', 255), False), ('study_type', ('String', 255), False), ('acronym', ('String', 255), False), ('baseline_population', ('String', 65535), False), ('brief_title', ('String', 65535), False), ('official_title', ('String', 65535), False), ('overall_status', ('String', 255), False), ('last_known_status', ('String', 255), False), ('phase', ('String', 255), False), ('enrollment', 'Int32', False), ('enrollment_type', ('String', 255), False), ('source', ('String', 255), False), ('limitations_and_caveats', ('String', 255), False), ('number_of_arms', 'Int32', False), ('number_of_groups', 'Int32', False), ('why_stopped', ('String', 255), False), ('has_expanded_access', 'UInt8', False), ('expanded_access_type_individual', 'UInt8', False), ('expanded_access_type_intermediate', 'UInt8', False), ('expanded_access_type_treatment', 'UInt8', False), ('has_dmc', 'UInt8', False), ('is_fda_regulated_drug', 'UInt8', False), ('is_fda_regulated_device', 'UInt8', False), ('is_unapproved_device', 'UInt8', False), ('is_ppsd', 'UInt8', False), ('is_us_export', 'UInt8', False), ('biospec_retention', ('String', 255), False), ('biospec_description', ('String', 65535), False), ('ipd_time_frame', ('String', 255), False), ('ipd_access_criteria', ('String', 255), False), ('ipd_url', ('String', 255), False), ('plan_to_share_ipd', ('String', 255), False), ('plan_to_share_ipd_description', ('String', 255), False), ('created_at', ('String', 65535), False), ('updated_at', ('String', 65535), False), ('source_class', ('String', 255), False), ('delayed_posting', ('String', 255), False), ('expanded_access_nctid', ('String', 255), False), ('expanded_access_status_for_nctid', ('String', 255), False), ('fdaaa801_violation', 'UInt8', False), ('baseline_type_units_analyzed', ('String', 255), False)]
Table ctgov.studies created in ClickHouse.
Offset: 0 Batch Size: 1
[['NCT06203795', None, '2023-12-07', None, None, '2024-01-02', '2024-01-02', '2024-01-12', 'Actual', None, None, None, None, None, None, '2024-01-02', '2024-01-12', 'Actual', '2023-10-02', 'Actual', '2023-10-02', '2024-01-11', '2024-01-31', '2024-08-31', 'Anticipated', '2024-08-31', '2023-10-26', 'Actual', '2023-10-26', None, 'Interventional', 'PATENCY', None, 'Dialysis Performance of the FX CorAL Membrane', 'Performance of Toxin Removal and Clotting Kinetics in Dialyzers', 'Active, not recruiting', None, 'Not Applicable', 10, 'Actual', 'University Hospital, Ghent', None, 4, None, None, False, None, None, None, False, False, False, None, None, None, None, None, None, None, None, 'No', None, '2024-02-04', '2024-02-04', 'OTHER', None, None, None, None, None]]
Error serializing column `study_first_submitted_date` into data type `Nullable(Date)`
Traceback (most recent call last):
  File "/Users/onlyimmutable/Desktop/Projects/temp/postgres-to-clickhouse/venv/lib/python3.9/site-packages/clickhouse_connect/driver/transform.py", line 102, in chunk_gen
    col_type.write_column(data, output, context)
  File "/Users/onlyimmutable/Desktop/Projects/temp/postgres-to-clickhouse/venv/lib/python3.9/site-packages/clickhouse_connect/datatypes/base.py", line 202, in write_column
    self.write_column_data(column, dest, ctx)
  File "/Users/onlyimmutable/Desktop/Projects/temp/postgres-to-clickhouse/venv/lib/python3.9/site-packages/clickhouse_connect/datatypes/base.py", line 217, in write_column_data
    self._write_column_binary(column, dest, ctx)
  File "/Users/onlyimmutable/Desktop/Projects/temp/postgres-to-clickhouse/venv/lib/python3.9/site-packages/clickhouse_connect/datatypes/temporal.py", line 45, in _write_column_binary
    column = [0 if x is None else (x - esd).days for x in column]
  File "/Users/onlyimmutableDesktop/Projects/temp/postgres-to-clickhouse/venv/lib/python3.9/site-packages/clickhouse_connect/datatypes/temporal.py", line 45, in <listcomp>
    column = [0 if x is None else (x - esd).days for x in column]
TypeError: unsupported operand type(s) for -: 'str' and 'datetime.date'
Error: unsupported operand type(s) for -: 'str' and 'datetime.date'

Process finished with exit code 0

ClickHouse server

genzgd commented 4 months ago

To be clear, clickhouse-connect doesn't do any string parsing of dates during insert. It looks like you are trying to insert a row that contains a string in a date column, and that's what causing the TypeError.

From your code above, that seems like it's happening in this block of code:

try:
    # Try to parse the string as a date
    parsed_date = parser.parse(value).date()
    escaped_row.append(parsed_date.strftime("%Y-%m-%d"))
except ValueError:
    # If parsing fails, you can handle it as needed.
    # Here, we are appending the original string.
    escaped_row.append(str(value))

As the comment says, "you can handle . . . as needed" what happens when parsing the string value into a date fails. What's needed in this case is adding either a valid date value or None, not just adding the bad string anyway.

The simplest fix is to that the code above to escaped_row.append(None), instead of escape_row.append(str(value)), but you may want to log the bad string that's coming in.

OnlyImmutable commented 4 months ago

Then what format should it be in? Because I've tried inserting a string, a date, in multiple formats?

The column is a date, i want to insert a date as expected so what do I do to solve it? I can't just enter nothing?

OnlyImmutable commented 4 months ago

To be clear, clickhouse-connect doesn't do any string parsing of dates during insert. It looks like you are trying to insert a row that contains a string in a date column, and that's what causing the TypeError.

From your code above, that seems like it's happening in this block of code:

try:
    # Try to parse the string as a date
    parsed_date = parser.parse(value).date()
    escaped_row.append(parsed_date.strftime("%Y-%m-%d"))
except ValueError:
    # If parsing fails, you can handle it as needed.
    # Here, we are appending the original string.
    escaped_row.append(str(value))

As the comment says, "you can handle . . . as needed" what happens when parsing the string value into a date fails. What's needed in this case is adding either a valid date value or None, not just adding the bad string anyway.

The simplest fix is to that the code above to escaped_row.append(None), instead of escape_row.append(str(value)), but you may want to log the bad string that's coming in.

As you can see here, I switched it to insert a date object instead and I get this exception, so clearly there is something going on? The point of the try and catch is purely to access if a date comes in as a string for whatever reason, which is fairly likely. If its not a date we want to insert it as a string, I just need help getting dates to insert properly?

async def insert_data_into_clickhouse(db, table_name, column_names, select_batch_size=10000):
    # Get the total number of rows
    total_rows_query = f"SELECT COUNT(*) FROM {table_name.lower()};"
    total_rows = (await db.execute(text(total_rows_query))).scalar()

    for offset in range(0, total_rows, select_batch_size):
        select_query = f"SELECT * FROM {table_name.lower()} OFFSET {offset} LIMIT {select_batch_size};"
        results = (await db.execute(text(select_query))).fetchall()

        print("Offset:", offset, "Batch Size:", len(results))

        # Prepare the data for insertion
        data_values = []

        for row in results:
            # Handle None values for nullable columns and convert date objects to strings
            escaped_row = []
            for value in row:
                if value is None:
                    escaped_row.append(None)
                elif isinstance(value, (datetime.datetime, datetime.date)):
                    escaped_row.append(value.strftime("%Y-%m-%d"))
                elif isinstance(value, str) and value:
                    # Handle date that is passed as string
                    try:
                        # Try to parse the string as a date
                        parsed_date = parser.parse(value).date()
                        escaped_row.append(parsed_date)
                    except ValueError:
                        # If parsing fails, you can handle it as needed.
                        # Here, we are appending the original string.
                        escaped_row.append(str(value))
                else:
                    escaped_row.append(value)

            # print("Original Row:", row)
            # print("Escaped Row:", escaped_row)
            data_values.append(escaped_row)

            # Print information for debugging
            # print("Column Names:", column_names)
            # print("Data Values:", data_values)

        # Use the insert method with column_names parameter
        client.insert(table=table_name, data=data_values, column_names=column_names)
        print(f"Inserted into {table_name}... batch offset: {offset}... batch size: {len(results)}")
        print()

    print("Data insertion completed.")
Error: object of type 'datetime.date' has no len()
Process finished with exit code 0