TobikoData / sqlmesh

Efficient data transformation and modeling framework that is backwards compatible with dbt.
https://sqlmesh.com
Apache License 2.0
1.82k stars 160 forks source link

`INCREMENTAL_BY_PARTITION` BigQuery models use execution project for referencing snapshots #3055

Closed jonathan-ostrander closed 2 months ago

jonathan-ostrander commented 2 months ago

Issue for https://tobiko-data.slack.com/archives/C044BRE5W4S/p1724446893811889

SQLMesh version: 0.109.2

Simplified config:

import os

from sqlmesh.core.config import BigQueryConnectionConfig
from sqlmesh.core.config import Config
from sqlmesh.core.config import GatewayConfig
from sqlmesh.core.config import ModelDefaultsConfig
from sqlmesh.core.config.categorizer import CategorizerConfig
from sqlmesh.core.context import Context

dev_connection = BigQueryConnectionConfig(
    project="h4r-common-bi-dev",
    execution_project="gcp-hopper-etl-development",
))

base_config = Config(
    model_defaults=ModelDefaultsConfig(
        dialect="bigquery",
        cron="@hourly",
    ),
    gateways={
        "development": GatewayConfig(
            connection=dev_connection,
            test_connection=dev_connection,
            state_schema="engops_bi_sqlmesh_state",
        ),
    },
    default_gateway="development",
    default_target_environment=os.getenv("USER", "prod"),
)

context = Context(config=base_config)

config = base_config.update_with(
    Config(
        physical_schema_override={
            model.schema_name: f"{model.schema_name}_physical"
            for model in context.models.values()
            if not model.kind.is_embedded
        },
    )
)

gcp-hopper-etl-development is our execution project where we want all BigQuery jobs to run as well as where our engops_bi_sqlmesh_state connection dataset lives while h4r-common-bi-dev is where all of our "data plane" data for SQLMesh should go.

When trying to apply the following INCREMENTAL_BY_PARTITION model for the first time

MODEL (
  name datadog__slv.measure_hourly_request_count,
  kind INCREMENTAL_BY_PARTITION,
  cron '@hourly',
  grain (
    hour,
    realm,
    service,
    resource_name,
  ),
  partitioned_by TIMESTAMP_TRUNC(hour, DAY)
);

SELECT
  hour::TIMESTAMP, /* The hour the requests occurred during. */
  realm::STRING, /* The realm the resource belongs to. */
  service::STRING, /* The service the resource belongs to. */
  resource_name::STRING, /* The name of the resource. */
  request_count::INTEGER, /* The number of requests to this resource in the hour. */
FROM datadog__brz.norm_hourly_request_metrics

we get the following stack trace error:

FAILED processing snapshot "h4r-common-bi-dev"."datadog__slv"."measure_hourly_request_count"
Traceback (most recent call last):
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/utils/concurrency.py", line 227, in sequential_apply_to_dag
    fn(node)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/scheduler.py", line 290, in evaluate_node
    self.evaluate(snapshot, start, end, execution_time, deployability_index, batch_idx)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/scheduler.py", line 166, in evaluate
    wap_id = self.snapshot_evaluator.evaluate(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/snapshot/evaluator.py", line 126, in evaluate
    result = self._evaluate_snapshot(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/snapshot/evaluator.py", line 617, in _evaluate_snapshot
    apply(query_or_df, index)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/snapshot/evaluator.py", line 518, in apply
    evaluation_strategy.insert(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/snapshot/evaluator.py", line 1241, in insert
    self.adapter.insert_overwrite_by_partition(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/engine_adapter/shared.py", line 267, in internal_wrapper
    return func(*list_args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/engine_adapter/bigquery.py", line 399, in insert_overwrite_by_partition
    self.execute(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/engine_adapter/base.py", line 1913, in execute
    self._execute(sql, **kwargs)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/engine_adapter/bigquery.py", line 664, in _execute
    results = self._db_call(
              ^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/engine_adapter/bigquery.py", line 622, in _db_call
    return func(
           ^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/query.py", line 1676, in result
    while not is_job_done():
              ^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/query.py", line 1645, in is_job_done
    self._reload_query_results(retry=retry, **reload_query_results_kwargs)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/bigquery/job/query.py", line 1443, in _reload_query_results
    self._query_results = self._client._get_query_results(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 2024, in _get_query_results
    resource = self._call_api(
               ^^^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/bigquery/client.py", line 833, in _call_api
    return call()
           ^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 GET https://bigquery.googleapis.com/bigquery/v2/projects/gcp-hopper-etl-development/queries/e3515662-e435-4d5a-bfd0-2584c7c97b66?maxResults=0&location=US&prettyPrint=false:
Not found: Dataset gcp-hopper-etl-development:datadog__slv_physical was not found in location US at [1:62]

Location: US
Job ID: e3515662-e435-4d5a-bfd0-2584c7c97b66

2024-08-27 09:36:52,630 - MainThread - sqlmesh.core.context - ERROR - Apply Failure: Traceback (most recent call last):
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/context.py", line 1207, in apply
    self._apply(plan, circuit_breaker)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/context.py", line 1760, in _apply
    self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker)
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/plan/evaluator.py", line 112, in evaluate
    self._backfill(
  File "/Users/jostrander/Hopper/engops-bi-sqlmesh/.venv/lib/python3.11/site-packages/sqlmesh/core/plan/evaluator.py", line 175, in _backfill
    raise SQLMeshError("Plan application failed.")
sqlmesh.utils.errors.SQLMeshError: Plan application failed.
 (context.py:1215)
Error: Plan application failed.

Taking a look at the SQL for that BigQuery job

DECLARE _sqlmesh_target_partitions_ ARRAY<TIMESTAMP> DEFAULT (SELECT ARRAY_AGG(PARSE_TIMESTAMP('%Y%m%d', partition_id)) FROM `datadog__slv_physical`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_datadog__slv__measure_hourly_request_count__768010755_hnf589w3' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');

It looks like the project ID is not included in this query so BigQuery defaults to looking in the project where the job is running.

jonathan-ostrander commented 2 months ago

I'm fairly certain the issue is here due to the database not being passed in to select_partitions_expr.

treysp commented 2 months ago

Closed by #3056