dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.6k stars 174 forks source link

Cannot create partitioned table in BigQuery destination #852

Closed willi-mueller closed 9 months ago

willi-mueller commented 10 months ago

dlt version

0.3.25 0.4.1a2

Describe the problem

When I specify a column to be a partition key in the BigQuery destination then we've been observing runtime errors. It seems that the dlt library does not create a valid DDL statement with a correct partition by specification.

Expected behavior

When specifying

@dlt.resource(
    write_disposition="merge",
    primary_key=("my_date_column"),
    columns={"my_date_column": {"data_type": "date", "partition": True, "nullable": False}},
    )

The resulting table is partitioned by date and the loading does not crash.

Steps to reproduce

Partition by Date

Run this pipeline trying to set a date column as partition key:

Code

from datetime import date
import dlt

@dlt.resource(
    write_disposition="merge",
    primary_key=("my_date_column"),
    columns={"my_date_column": {"data_type": "date", "partition": True, "nullable": False}},
)
def demo_resource():
    for i in range(10):
        yield {"my_date_column": date.fromtimestamp(1700784000 + i * 50_000), "metric": i}

if __name__ == "__main__":
    pipeline = dlt.pipeline(pipeline_name="demo", destination="bigquery")
    print(list(demo_resource()))

    load_info = pipeline.run(
        demo_resource,
        dataset_name="dev_raw_dlt",
        table_name="demo_partition",
    )
    print(load_info)

Exception

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load with exception:

<class 'dlt.destinations.exceptions.DatabaseTransientException'>
400 Query error: PARTITION BY expression must be _PARTITIONDATE, DATE(_PARTITIONTIME), DATE(<timestamp_column>), DATE(<datetime_column>), DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR), a DATE column, TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR), DATE_TRUNC(<date_column>, MONTH/YEAR), or RANGE_BUCKET(<int64_column>, GENERATE_ARRAY(<int64_value>, <int64_value>[, <int64_value>])) at [15:1]

Partition by Integer

Code

from datetime import date
import dlt

@dlt.resource(
    columns={"some_int": {"data_type": "bigint", "partition": True, "nullable": False}},
)
def demo_resource():
    for i in range(10):
        yield {"my_date_column": date.fromtimestamp(1700784000 + i * 50_000), "some_int": i}

if __name__ == "__main__":
    pipeline = dlt.pipeline(pipeline_name="demo", destination="bigquery")
    print(list(demo_resource()))

    load_info = pipeline.run(
        demo_resource,
        dataset_name="some_demo",
        table_name="int_partition_demo",
    )
    print(load_info)

Exception

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load with exception:

<class 'dlt.destinations.exceptions.DatabaseTransientException'>
400 No matching signature for function DATE for argument types: INT64. Supported signatures: DATE(TIMESTAMP, [STRING]); DATE(DATETIME); DATE(INT64, INT64, INT64); DATE(DATE); DATE(STRING) at [6:14]

Operating system

macOS

Runtime environment

Local

Python version

3.11

dlt data source

not applicable

dlt destination

Google BigQuery

Other deployment details

No response

Additional information

No response

rudolfix commented 10 months ago

@willi-mueller there's hardcoded expression to create partitions. very old code :) with your repros it will be an easy fix thanks :) we'll double down on BigQuery resource adapter so various settings for partition and other column hints can be set in a simple way (like we do for vector databases with embeddings)

rudolfix commented 10 months ago

to fix we need to take into account the data type of the partition column in bigquery.py. this also should be tested in query builder tests and in pipeline test (using test cases above)

Pipboyguy commented 10 months ago

Hope you don't mind, but I've added a date test as well since technically above only tests for datetime:

@pytest.mark.parametrize(
    "destination_config",
    destinations_configs(all_staging_configs=True, subset=["bigquery"]),
    ids=lambda x: x.name,
)
def test_bigquery_partition_by_date(destination_config: DestinationTestConfiguration) -> None:
    pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", full_refresh=True)

    @dlt.resource(
        write_disposition="merge",
        primary_key="my_date_column",
        columns={"my_date_column": {"data_type": "date", "partition": True, "nullable": False}},
    )
    def demo_resource() -> Iterator[Dict[str, Union[int, pendulum.DateTime]]]:
        for i in range(10):
            yield {
                "my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(),
                "metric": i,
            }

    @dlt.source(max_table_nesting=0)
    def demo_source() -> DltResource:
        return demo_resource

    pipeline.run(demo_source())

I added this because BQ checks handles these differently and need different logic