dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.38k stars 154 forks source link

When using BigQuery as a destination, adapter settings `partition` and `autodetect_schema` are exclusive #1746

Closed neuromantik33 closed 2 weeks ago

neuromantik33 commented 1 month ago

dlt version

0.5.3

Describe the problem

I would like for dlt to be able to create partitioned BQ tables with partition columns as well as able to auto detect any schema changes as described here. When I run this code, everything works great

@dlt.source
def intra_db(conn_uri: str) -> DltSource:
    return [  # type: ignore[return-value]
        bigquery_adapter(
            sql_table(
                credentials=conn_uri,
                table=table,
                incremental=dlt.sources.incremental("updated_at"),
                detect_precision_hints=True,
                table_adapter_callback=table_adapters.get(table),  # type: ignore[arg-type]
            ),
            partition="updated_at",
        )
        for table in tables
    ]

and I have indeed time partitioned tables.

However, when working with complex types in parquet, I've only had success when adding the autodetect_schema=True hint to the bigquery_adapter function call. However once that is done, any partition info is lost upon table creation.

Expected behavior

The expected behavior is too be able to infer the schema using BQ's autodetect mecanism yet allow any manual column hints like clustering or partitioning.

Steps to reproduce

import dlt from dlt.destinations.adapters import bigquery_adapter from dlt.extract import DltSource from sqlalchemy import Table

from intra_dlt.helpers.sql_database import sql_table

table_adapters: dict[str, Callable[[Table], None]] = {} tables = [ "table_a", ]

@dlt.source def test_db(conn_uri: str) -> DltSource: return [ # type: ignore[return-value] bigquery_adapter( sql_table( credentials=conn_uri, table=table, incremental=dlt.sources.incremental("updated_at"), detect_precision_hints=True, table_adapter_callback=table_adapters.get(table), # type: ignore[arg-type] ), partition="updated_at", autodetect_schema=True, ## <-- FIXME Bug is here ) for table in tables ]

if name == "main": from dlt import Pipeline

pipeline: Pipeline = dlt.pipeline(
    pipeline_name="intra_db",
    destination="bigquery",
    staging="filesystem",
    dataset_name="test",
)

pg_dsn = "postgresql+psycopg2://test:test@localhost:5432/test"
load_info = pipeline.run(test_db(pg_dsn), loader_file_format="parquet")

load_info.raise_on_failed_jobs()
print(load_info)

### Operating system

Linux

### Runtime environment

Local

### Python version

3.10

### dlt data source

sql_database

### dlt destination

Google BigQuery

### Other deployment details

The issue is with this piece of code https://github.com/dlt-hub/dlt/blob/devel/dlt/destinations/impl/bigquery/bigquery.py#L281-L287
```python
def _get_table_update_sql(
    self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
    # return empty columns which will skip table CREATE or ALTER
    # to let BigQuery autodetect table from data
    if self._should_autodetect_schema(table_name):
        return []
    ...

which essentially ignores any other hints in place of auto detection.

Additional information

No response

rudolfix commented 1 month ago

@neuromantik33 is there a way to implement it? when we use schema autodetection I think there's no way to ask to create partitions etc. if you known any workarounds let us know - this will speed up implementation significantly

neuromantik33 commented 1 month ago

@rudolfix Well I dont know what the latest python API provides but at least here is an example using a simple shell script

#!/bin/bash

set -ex

DATASET=${DATASET:-drnick}
TABLE=${TABLE:-test_autodetect}
TABLE_QNAME="${DATASET}.${TABLE}"

BUCKET=${BUCKET:-dev-42network}
GS_PATH="gs://$BUCKET/test.json"

bq rm -f "$TABLE_QNAME"
bq ls -q "$TABLE_QNAME" || true

gsutil cat $GS_PATH | jq

gsutil cp test.json "$GS_PATH"
gsutil ls -lh "$GS_PATH"

bq load --source_format NEWLINE_DELIMITED_JSON \
  --autodetect \
  --clustering_fields string \
  --time_partitioning_field timestamp \
  --time_partitioning_type DAY \
  "$TABLE_QNAME" \
  "$GS_PATH"

bq show "$TABLE_QNAME"
bq query --nouse_legacy_sql "select * from $TABLE_QNAME"

and the corresponding output

~/tmp $ ./load.sh 
+ DATASET=drnick
+ TABLE=test_autodetect
+ TABLE_QNAME=drnick.test_autodetect
+ BUCKET=dev-42network
+ GS_PATH=gs://dev-42network/test.json
+ bq rm -f drnick.test_autodetect
+ bq ls -q drnick.test_autodetect
Invalid identifier "drnick.test_autodetect" for ls, cannot call list on object of type TableReference
+ true
+ gsutil cat gs://dev-42network/test.json
+ jq
{
  "string": "test",
  "timestamp": "2024-10-03T09:59:03.423570Z"
}
{
  "string": "test2",
  "timestamp": "2024-10-04T09:59:03.423570Z"
}
+ gsutil cp test.json gs://dev-42network/test.json
Copying file://test.json [Content-Type=application/json]...
/ [1 files][  125.0 B/  125.0 B]                                                
Operation completed over 1 objects/125.0 B.                                      
+ gsutil ls -lh gs://dev-42network/test.json
     125 B  2024-08-27T13:47:53Z  gs://dev-42network/test.json
TOTAL: 1 objects, 125 bytes (125 B)
+ bq load --source_format NEWLINE_DELIMITED_JSON --autodetect --clustering_fields string --time_partitioning_field timestamp --time_partitioning_type DAY drnick.test_autodetect gs://dev-42network/test.json
Waiting on bqjob_r6e9c2292cde7c606_000001919417d6a4_1 ... (1s) Current status: DONE   
+ bq show drnick.test_autodetect
Table dev-42network:drnick.test_autodetect

   Last modified            Schema            Total Rows   Total Bytes   Expiration     Time Partitioning      Clustered Fields   Total Logical Bytes   Total Physical Bytes   Labels  
 ----------------- ------------------------- ------------ ------------- ------------ ------------------------ ------------------ --------------------- ---------------------- -------- 
  27 Aug 15:47:58   |- timestamp: timestamp   2            29                         DAY (field: timestamp)   string             29                                                   
                    |- string: string                                                                                                                                                  

+ bq query --nouse_legacy_sql 'select * from drnick.test_autodetect'
+---------------------+--------+
|      timestamp      | string |
+---------------------+--------+
| 2024-10-04 09:59:03 | test2  |
| 2024-10-03 09:59:03 | test   |
+---------------------+--------+
rudolfix commented 1 month ago

@neuromantik33 OK so this is doable. python api typically has parity with cli...