dbt-labs / dbt-spark

dbt-spark contains all of the code enabling dbt to work with Apache Spark and Databricks
https://getdbt.com
Apache License 2.0
395 stars 221 forks source link

[ADAP-1048] [Bug] Replacing existing table using incremental model #950

Closed vladimir-vvalov closed 7 months ago

vladimir-vvalov commented 10 months ago

Is this a new bug in dbt-spark?

Current Behavior

Sometimes an existing table which maintained by the incremental model is replaced by command 'create or replace table' because load_relation(this) returns None It happens when the error during

show table extended in [schema] like '*'

After error dbt continues to work but all runned incremental models is running using create_table_as. Because of this the entire history in the incremental table is replaced with short data from the current update. More details are below.

Expected Behavior

This may be not good default behavior where error in command

show table extended in [schema] like '*'

is interpreted as missing tables when those tables exist.

Steps To Reproduce

profiles:

    prod:
      host: hive-client.******
      method: thrift
      port: 10009
      schema: logistics_dbt
      threads: 10
      type: spark
      auth: 'LDAP'
      user: '****'
      password: '***'
      server_side_parameters: {
        'spark.app.name': '*****_dbt',
        'kyuubi.engine.share.level.subdomain': '*****_dbt',
        'kyuubi.session.engine.idle.timeout': 'PT5M',
        'spark.sql.session.timeZone': 'Europe/Kiev', 
      }

The project has some sets of models that can be executed in parallel in different environments. set1 has some models (several set1_stg, several set1_int__ and incremental set1_marttable. set2 has some models (several set2_stg__, several view models like set2_int__ and set2_mart. set1 is isolated from set2 (models of set1 don't have any depencies on models of set2 and doesn't depend on any tables/views of set2. And vice versa). models of both sets using same schema 'logistics_dbt'

Configs that are used for set1_mart__table (or other similar incremental models) in 1.7 versions

{{ config(
    materialized='incremental',
    file_format='delta',
    incremental_strategy='insert_overwrite',
    partition_by='partition_month',
    post_hook=["OPTIMIZE {{ this }};"],
) }}

in 1.7 and older, example 1

{{ config(
    materialized='incremental',
    incremental_strategy='append',
    file_format='delta',
    partition_by='date',
    pre_hook=[
        "{{ oborot_rc_macro__pre_hook_delete(model.name, model.schema) }}",
    ],
    post_hook=["OPTIMIZE {{ this }};"],
    full_refresh=false,
) }}

in 1.7 and older, example 2

{{ config(
    partition_by = "yyyymm",
    incremental_strategy = 'merge',
    file_format='delta',
    materialized = 'incremental',
    unique_key=["operation_date", "fil_id", "lager_id", "stock_type_id", "stock_purpose_id"],
) }}

When this sets are run is parallel in different isolated environments, sometimes incremental model 'set1_marttable' is considered non-existent then dbt makes script 'create or replace table set1_mart__table'. Usual run don’t show any errors. When I run set1 with '--debug', I see error after this: show table extended in logistics_dbt like '*' 'Table or view 'gold_staging_stgstkprod_tb_ecde' not found in database 'logistics_dbt'' (this is a view model from parallel set2) part of log:

[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  On list_None_logistics_dbt: /* {"app": "dbt", "dbt_version": "1.7.1", "profile_name": "dbt_logistics", "target_name": "prod", "connection_name": "list_None_logistics_dbt"} */
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - show table extended in logistics_dbt like '*'
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  Opening a new connection, currently in state closed
[2023-11-13T09:07:12.114+0200] {{process_utils.py:190}} INFO - 07:04:13  Spark adapter: Poll status: 1, sleeping
[2023-11-13T09:07:12.114+0200] {{process_utils.py:190}} INFO - 07:04:18  Spark adapter: Poll status: 1, sleeping
[2023-11-13T09:07:12.119+0200] {{process_utils.py:190}} INFO - 07:04:21  Spark adapter: Poll response: TGetOperationStatusResp(status=TStatus(statusCode=0, infoMessages=None, sqlState=None, errorCode=None, errorMessage=None), operationState=5, sqlState=None, errorCode=0, errorMessage="org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'gold_staging_stg__stkprod_tb_ecde' not found in database 'logistics_dbt'\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:226)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:514)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:500)\n\tat 

set2 executes in parallel 'create or replace view gold_staging_stg__stkprod_tb_ecde'.

After this error dbt continues to work but for any incremental model 'load_relation(this)' returns None. When the set1 incremental model runs the macro dbt-spark/dbt/include/spark/macros/materializations/incremental/incremental.sql at main · dbt-labs/dbt-spark (github.com) is getting None from this expression: {%- set existing_relation = load_relation(this) -%} and according to value choose this option ('create_table_as'):

  {#-- Incremental run logic --#}
  {%- if existing_relation is none -%}
    {#-- Relation must be created --#}
    {%- call statement('main', language=language) -%}
      {{ create_table_as(False, target_relation, compiled_code, language) }}
    {%- endcall -%}
    {% do persist_constraints(target_relation, model) %}

Workaround: I created a macro that is called in the pre_hook of each incremental model that checks for the existence of the table before running the model and compares it to the result of load_relation(this). If load_relation(this) == None but the table exists, the macro generates an error to block further execution.

Below full part of log from start until error.

Relevant log output

[2023-11-13T09:07:12.109+0200] {{process_utils.py:190}} INFO - Executing: dbt --debug build --select tag:oborot_rc_model --vars '{calculation_offset_from: -1}' --profiles-dir /dev/shm/dbt/profiles --target prod
[2023-11-13T09:07:12.110+0200] {{process_utils.py:190}} INFO - 07:03:08  Running with dbt=1.7.1
[2023-11-13T09:07:12.110+0200] {{process_utils.py:190}} INFO - 07:03:08  running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'warn_error': 'None', 'version_check': 'True', 'fail_fast': 'False', 'log_path': '/dev/shm/dbt/logs', 'profiles_dir': '/dev/shm/dbt/profiles', 'debug': 'True', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'introspect': 'True', 'invocation_command': 'dbt --debug build --select tag:oborot_rc_model --vars {calculation_offset_from: -1} --profiles-dir /dev/shm/dbt/profiles --target prod', 'log_format': 'default', 'target_path': 'None', 'static_parser': 'True', 'send_anonymous_usage_stats': 'False'}
[2023-11-13T09:07:12.110+0200] {{process_utils.py:190}} INFO - 07:03:09  Registered adapter: spark=1.7.1
[2023-11-13T09:07:12.110+0200] {{process_utils.py:190}} INFO - 07:03:09  checksum: c70bacfada98680b730e66f651aa3f70b5af4e123922db58602651d9d2aefe8d, vars: {'calculation_offset_from': -1}, profile: , target: prod, version: 1.7.1
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:09  Unable to do partial parsing because saved manifest not found. Starting full parse.
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29  Found 869 models, 3 snapshots, 2 seeds, 1 operation, 468 tests, 484 sources, 0 exposures, 0 metrics, 610 macros, 0 groups, 0 semantic models
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29  Acquiring new spark connection 'master'
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29  Acquiring new spark connection 'list_schemas'
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29  Using spark connection "list_schemas"
[2023-11-13T09:07:12.111+0200] {{process_utils.py:190}} INFO - 07:03:29  On list_schemas: /* {"app": "dbt", "dbt_version": "1.7.1", "profile_name": "dbt_logistics", "target_name": "prod", "connection_name": "list_schemas"} */
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO -     show databases
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 07:03:29  Opening a new connection, currently in state init
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 07:04:06  Spark adapter: Poll status: 2, query complete
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 07:04:06  SQL status: OK in 37.0 seconds
[2023-11-13T09:07:12.112+0200] {{process_utils.py:190}} INFO - 07:04:07  On list_schemas: Close
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  Re-using an available connection from the pool (formerly list_schemas, now list_None_logistics_dbt)
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  Spark adapter: NotImplemented: add_begin_query
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  Using spark connection "list_None_logistics_dbt"
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  On list_None_logistics_dbt: /* {"app": "dbt", "dbt_version": "1.7.1", "profile_name": "dbt_logistics", "target_name": "prod", "connection_name": "list_None_logistics_dbt"} */
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - show table extended in logistics_dbt like '*'
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 
[2023-11-13T09:07:12.113+0200] {{process_utils.py:190}} INFO - 07:04:07  Opening a new connection, currently in state closed
[2023-11-13T09:07:12.114+0200] {{process_utils.py:190}} INFO - 07:04:13  Spark adapter: Poll status: 1, sleeping
[2023-11-13T09:07:12.114+0200] {{process_utils.py:190}} INFO - 07:04:18  Spark adapter: Poll status: 1, sleeping
[2023-11-13T09:07:12.119+0200] {{process_utils.py:190}} INFO - 07:04:21  Spark adapter: Poll response: TGetOperationStatusResp(status=TStatus(statusCode=0, infoMessages=None, sqlState=None, errorCode=None, errorMessage=None), operationState=5, sqlState=None, errorCode=0, errorMessage="org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'gold_staging_stg__stkprod_tb_ecde' not found in database 'logistics_dbt'\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:226)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:514)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:500)\n\tat

Environment

- OS: Debian GNU/Linux 11 (bullseye)
- Python: CPython3.10.13.final.0-64
- dbt-core: 1.7.1, 1.7.0b2, 1.5.0, 1.4.1
- dbt-spark: 1.7.1, 1.7.0b2, 1.5.0, 1.4.1

Additional Context

Don't blame me if it's not a bug. I tried to find a similar problem, but didn't find it. I'm confused by the default behavior. It’s not very pleasant when dags are executed successfully but a few days later customers write that the history in the mart has disappeared.

Fleid commented 7 months ago

That is a valid point, but to me there is no clear way to move forward.

I don't think changing default behavior at this point is reasonable. The expectation is that dbt will "force its way through" when the state can't be determined. We could potentially surface the behavior as a parameter - fail if you can't get metadata. But at that point I'd rather we focus on ways to make the metadata retrieval more consistent.

I'm closing for now as won't fix, but I'm happy to listen to counter arguments :)