googleapis / python-bigquery-sqlalchemy

SQLAlchemy dialect for BigQuery
MIT License
426 stars 127 forks source link

fix: Set cte_follows_insert to True #1095

Closed aholyoke closed 1 month ago

aholyoke commented 2 months ago

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

Fixes # 🦕

cte_follows_insert is a dialect flag which dictates where the CTE appears relative to an insert-from-select statement.

For example, when cte_follows_insert=True, then SA will generate

INSERT INTO table
WITH cte_name AS (
  ...
)
SELECT * FROM cte_name

and when cte_follows_insert=False, then SA will generate

WITH cte_name AS (
  ...
)
INSERT INTO table
SELECT * FROM cte_name

If we unskip the compliance test test_insert_from_select_round_trip, the test will fail with

FAILED tests/sqlalchemy_dialect_compliance/test_dialect_compliance.py::CTETest_bigquery+bigquery::test_insert_from_select_round_trip - sqlalchemy.exc.DatabaseError: (google.cloud.bigquery.dbapi.exceptions.DatabaseError) 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/flywheel-dev-328120/queries?prettyPrint=false: Syntax error: Unexpected keyword INSERT at [5:2]
[SQL: WITH `some_cte` AS
(SELECT `some_table`.`id` AS `id`, `some_table`.`data` AS `data`, `some_table`.`parent_id` AS `parent_id`
FROM `some_table`
WHERE `some_table`.`data` IN UNNEST(%(data_1:STRING)s))
 INSERT INTO `some_other_table` (`id`, `data`, `parent_id`) SELECT `some_cte`.`id`, `some_cte`.`data`, `some_cte`.`parent_id`
FROM `some_cte`]
[parameters: {'data_1': ['d2', 'd3', 'd4']}]

This is because by default SA uses cte_follows_insert=False, but BQ syntax expects CTEs to follow inserts.

Here's a snippet that can reproduce the issue:

import sqlalchemy as sa

engine = sa.create_engine("bigquery://")
conn = engine.connect()

meta = sa.MetaData()
source_table = sa.Table(
    "source_table",
    meta,
    sa.Column("id", sa.Integer()),
    sa.Column("value", sa.Integer()),
    schema=DATASET_NAME,
)
dest_table = sa.Table(
    "dest_table",
    meta,
    sa.Column("id", sa.Integer()),
    sa.Column("value", sa.Integer()),
    schema=DATASET_NAME,
)
meta.create_all(bind=conn)

conn.execute(source_table.insert().values([
    {"id": 1, "value": 5},
    {"id": 2, "value": 2},
    {"id": 3, "value": 4},
    {"id": 4, "value": 9},
    {"id": 5, "value": 7},
    {"id": 6, "value": 1},
]))

cte_expr = sa.select(source_table.columns).where(source_table.c.value >= 5).cte("cte")
insert_stmt = dest_table.insert().from_select(cte_expr.columns, cte_expr)

conn.execute(insert_stmt)

Previously, the insert statement would fail with:

[SQL: WITH `cte` AS
(SELECT `source_table`.`id` AS `id`, `source_table`.`value` AS `value`
FROM `holyoke_test`.`source_table`
WHERE `source_table`.`value` >= %(value_1:INT64)s)
 INSERT INTO `holyoke_test`.`dest_table` (`id`, `value`) SELECT `cte`.`id`, `cte`.`value`
FROM `cte`]
[parameters: {'value_1': 5}]
(Background on this error at: https://sqlalche.me/e/20/4xp6)

But now it succeeds. We can see the query that it generates now correctly writes the cte after the insert statement.

In [5]: from sqlalchemy_bigquery import BigQueryDialect; print(str(insert_stmt.compile(dialect=BigQueryDialect())))
INSERT INTO `holyoke_test`.`dest_table` (`id`, `value`) WITH `cte` AS
(SELECT `source_table`.`id` AS `id`, `source_table`.`value` AS `value`
FROM `holyoke_test`.`source_table`
WHERE `source_table`.`value` >= :value_1)
 SELECT `cte`.`id`, `cte`.`value`
FROM `cte`
tswast commented 1 month ago

@Linchin please take a look

Linchin commented 1 month ago

@tswast Thanks for the heads up! Sorry for the delay, I got pulled into other projects.

@aholyoke Thanks a lot for helping fix this bug! Also the description is very thorough, which really helps with reviewing.