dbt-labs / dbt-core

dbt enables data analysts and engineers to transform their data using the same practices that software engineers use to build applications.
https://getdbt.com
Apache License 2.0
9.62k stars 1.59k forks source link

[CT-530] [Bug] Partial parsing should handle volatile variables like `invocation_id` and `run_started_at` #4364

Open joellabes opened 2 years ago

joellabes commented 2 years ago

Is there an existing issue for this?

Current Behavior

You can trick dbt into giving dynamic freshness check behaviour with something like this:

        freshness:
          error_after:
            count: "{{ 48 if run_started_at.strftime('%w') == '1' else 24 }}" #looser expectations on Monday
            period: hour

Community Slack thread.

@jtcohen6 pointed out that

Small word of warning — this won't be re-parsed by partial parsing, and so it's liable to get stale. Shouldn't be an issue in scheduled runs (which don't persist partial-parse state between runs), it might just be cause for surprise if you come back to it locally a few days from now.

https://docs.getdbt.com/reference/parsing#known-limitations

Expected Behavior

Again from jerco:

In the future, we should think about ways to always re-parse (invalidate cache) for any configs/properties/nodes/etc that depend on "volatile" variables like run_started_at or invocation_id

Steps To Reproduce

  1. Configure a source freshness check as described above, such that it will fail today and pass every other day of the week
  2. Run dbt source freshness, note that the test fails
  3. Wait a day (or change the code above to be dependent on the minute of the day or something)
  4. Run dbt source freshness again. Note that the test still fails, even though it's a different day of the week.
  5. Run dbt source freshness --no-partial-parse. Note that the test now passes, because it's re-parsed the project and re-evaluated run_started_at.

Relevant log output

No response

Environment

- OS: macOS
- Python: 3.8.x
- dbt: 1.0-rc1

What database are you using dbt with?

snowflake

Additional Context

Related: https://github.com/dbt-labs/dbt-core/issues/2330

jtcohen6 commented 2 years ago

Thanks for opening @joellabes!

I imagine this will be trickier to implement than for var() or env_var(), since references to run_started_at + invocation_id aren't macro "calls" that we already capture during parsing.

I'm sure there are more "volatile" variables that we could think up—for instance, datetime.now(). Should we attempt "cache invalidation" for Jinja expressions that depend on any of them?

I do think a reasonable first version of this would be limited to documented, "special" dbt context variables. (You managed to achieve the desired behavior well enough with run_started_at.)

gshank commented 2 years ago

In order to estimate this, we need to know which variables would be supported.

jtcohen6 commented 2 years ago

For starters, let's just say the two official context variables that we know will be different in every invocation:

Out of scope

Context variables that are likely to differ across invocations, but which we also highly highly discourage people from using in parse-time configurations:

Variables that are likely to change between invocations, but also aren't even really available at parse time:

gshank commented 2 years ago

Both invocation_id and run_started_at are actually methods in the context, so they can be gathered in a similar way to the env_vars. If they show up in a major yaml config file (profiles.yml, dbt_project.yml, etc) it would end up being the equivalent of re-parsing though, since we'd have to do more granular and second-order change collection to re-parse just affected objects.

If these are used in models it would be possible to do partial parsing.

github-actions[bot] commented 1 year ago

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

joellabes commented 1 year ago

Bump!

kiliantscherny commented 1 year ago

Bumping this as well as I'm quite interested in seeing this resolved too!

jeremyyeo commented 10 months ago

Adding another failure mode where partial parsing results in model configurations to retain it's old partial parsed value.

Project setup:

# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: "1.0.0"

models:
  my_dbt_project:
    +materialized: incremental

# ~/.profiles.yml
...
config:
  partial_parse: true
...

^ Partial parsing is turned on

-- macros/get_ts.sql
{% macro get_ts() %}
    {% set ts = modules.datetime.datetime.utcnow().isoformat(sep=" ", timespec="milliseconds") %}
    {% do log('\n\n>>>>>>>>>> macro set_ts() eval to: ' ~ ts ~ '<<<<<<<<<<\n\n', True) %}
    {{ return(ts) }}
{% endmacro %}

-- models/inc.sql
{%- set ts = get_ts() -%}

{{
    config(
      unique_key = 'id',
      incremental_predicates = ["DBT_INTERNAL_DEST.dwh_ts >= date('" ~ ts ~ "')"]
    )
}}

select 1 as id, sysdate() as dwh_ts, '{{ ts }}' as dbt_ts

The idea is to have the incremental_predicates config take the value from the macro get_ts. In the exercise below we will invoke dbt 3 times:

  1. dbt run with a full parse.
  2. dbt run with partial parsing.
  3. dbt run with partial parsing.

$ dbt clean # remove any existing `/target` so project will do a full parse next.

$ dbt run --debug --full-refresh 

22:27:23  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x107bb1640>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a7decd0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a81a3a0>]}
22:27:23  Running with dbt=1.6.6
22:27:23  running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'log_cache_events': 'False', 'write_json': 'True', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/jeremy/.dbt', 'debug': 'True', 'fail_fast': 'False', 'log_path': '/Users/jeremy/src/dbt-basic/logs', 'version_check': 'True', 'warn_error': 'None', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'invocation_command': 'dbt run --debug --full-refresh', 'introspect': 'True', 'static_parser': 'True', 'target_path': 'None', 'log_format': 'default', 'send_anonymous_usage_stats': 'True'}
22:27:25  Sending event: {'category': 'dbt', 'action': 'project_id', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a7de490>]}
22:27:25  Sending event: {'category': 'dbt', 'action': 'adapter_info', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11c96a910>]}
22:27:25  Registered adapter: snowflake=1.6.4
22:27:25  checksum: 546b81fb56652c304d87abd676e84d4737d8a0c6b62160f4a6e79dcddbc842bb, vars: {}, profile: , target: , version: 1.6.6
22:27:25  Unable to do partial parsing because saved manifest not found. Starting full parse.
22:27:25  Sending event: {'category': 'dbt', 'action': 'partial_parser', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11c97cc10>]}
22:27:26  

>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:27:26.570<<<<<<<<<<

22:27:26  Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11cb0b940>]}
22:27:26  Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11cb0bb20>]}
22:27:26  Found 1 model, 0 sources, 0 exposures, 0 metrics, 376 macros, 0 groups, 0 semantic models
22:27:26  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11c97cc40>]}
22:27:26  
22:27:26  Acquiring new snowflake connection 'master'
22:27:26  Acquiring new snowflake connection 'list_development'
22:27:26  Using snowflake connection "list_development"
22:27:26  On list_development: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development"} */
show terse schemas in database development
    limit 10000
22:27:26  Opening a new connection, currently in state init
22:27:29  SQL status: SUCCESS 356 in 2.0 seconds
22:27:29  On list_development: Close
22:27:29  Re-using an available connection from the pool (formerly list_development, now list_development_dbt_jyeo)
22:27:29  Using snowflake connection "list_development_dbt_jyeo"
22:27:29  On list_development_dbt_jyeo: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development_dbt_jyeo"} */
show terse objects in development.dbt_jyeo limit 10000
22:27:29  Opening a new connection, currently in state closed
22:27:31  SQL status: SUCCESS 47 in 2.0 seconds
22:27:31  On list_development_dbt_jyeo: Close
22:27:31  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11cab5160>]}
22:27:31  Concurrency: 1 threads (target='sf')
22:27:31  
22:27:31  Began running node model.my_dbt_project.inc
22:27:31  1 of 1 START sql incremental model dbt_jyeo.inc ................................ [RUN]
22:27:31  Re-using an available connection from the pool (formerly list_development_dbt_jyeo, now model.my_dbt_project.inc)
22:27:31  Began compiling node model.my_dbt_project.inc
22:27:31  

>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:27:31.941<<<<<<<<<<

22:27:31  Writing injected SQL for node "model.my_dbt_project.inc"
22:27:31  Timing info for model.my_dbt_project.inc (compile): 11:27:31.927912 => 11:27:31.944662
22:27:31  Began executing node model.my_dbt_project.inc
22:27:32  Writing runtime sql for node "model.my_dbt_project.inc"
22:27:32  Using snowflake connection "model.my_dbt_project.inc"
22:27:32  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
create or replace transient table development.dbt_jyeo.inc
         as
        (-- models/inc.sql

select 1 as id, sysdate() as dwh_ts, '2023-10-29 22:27:31.941' as dbt_ts
        );
22:27:32  Opening a new connection, currently in state closed
22:27:34  SQL status: SUCCESS 1 in 2.0 seconds
22:27:34  Using snowflake connection "model.my_dbt_project.inc"
22:27:34  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
drop view if exists development.dbt_jyeo.inc__dbt_tmp cascade
22:27:34  SQL status: SUCCESS 1 in 0.0 seconds
22:27:34  Timing info for model.my_dbt_project.inc (execute): 11:27:31.948227 => 11:27:34.633047
22:27:34  On model.my_dbt_project.inc: Close
22:27:35  Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '5db4a9ef-88bc-45ac-81a4-ffd426a269ae', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11db35d00>]}
22:27:35  1 of 1 OK created sql incremental model dbt_jyeo.inc ........................... [SUCCESS 1 in 3.29s]
22:27:35  Finished running node model.my_dbt_project.inc
22:27:35  Connection 'master' was properly closed.
22:27:35  Connection 'model.my_dbt_project.inc' was properly closed.
22:27:35  
22:27:35  Finished running 1 incremental model in 0 hours 0 minutes and 8.56 seconds (8.56s).
22:27:35  Command end result
22:27:35  
22:27:35  Completed successfully
22:27:35  
22:27:35  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
22:27:35  Command `dbt run` succeeded at 11:27:35.239342 after 11.66 seconds
22:27:35  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x107bb1640>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11c97c310>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x11cb1bfd0>]}
22:27:35  Flushing usage events

At the first run, the project was fully parsed (target/ folder now contains partial_parse.msgpack ready for the next invocation to reuse for partial parsing) and set_ts() was eval to:

>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:27:26.570<<<<<<<<<<
>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:27:31.941<<<<<<<<<<

^ Importantly, the first timestamp 2023-10-29 22:27:26.570 is now locked in for model configs below that make use of the set_ts() macro return value.

Let's do a subsequent run (now partial parsing kicks in):

$ dbt run --debug
22:29:25  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1115176d0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x114144cd0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x114180340>]}
22:29:25  Running with dbt=1.6.6
22:29:25  running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/jeremy/.dbt', 'debug': 'True', 'warn_error': 'None', 'log_path': '/Users/jeremy/src/dbt-basic/logs', 'version_check': 'True', 'fail_fast': 'False', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'log_format': 'default', 'invocation_command': 'dbt run --debug', 'target_path': 'None', 'introspect': 'True', 'send_anonymous_usage_stats': 'True'}
22:29:26  Sending event: {'category': 'dbt', 'action': 'project_id', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x114144430>]}
22:29:26  Sending event: {'category': 'dbt', 'action': 'adapter_info', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1262cc8b0>]}
22:29:26  Registered adapter: snowflake=1.6.4
22:29:26  checksum: 546b81fb56652c304d87abd676e84d4737d8a0c6b62160f4a6e79dcddbc842bb, vars: {}, profile: , target: , version: 1.6.6
22:29:26  Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
22:29:26  Partial parsing enabled, no changes found, skipping parsing
22:29:26  Sending event: {'category': 'dbt', 'action': 'load_project', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x12643d970>]}
22:29:26  Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x126388ee0>]}
22:29:26  Found 1 model, 0 sources, 0 exposures, 0 metrics, 376 macros, 0 groups, 0 semantic models
22:29:26  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x126388d30>]}
22:29:26  
22:29:26  Acquiring new snowflake connection 'master'
22:29:26  Acquiring new snowflake connection 'list_development'
22:29:26  Using snowflake connection "list_development"
22:29:26  On list_development: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development"} */
show terse schemas in database development
    limit 10000
22:29:26  Opening a new connection, currently in state init
22:29:28  SQL status: SUCCESS 356 in 2.0 seconds
22:29:28  On list_development: Close
22:29:29  Re-using an available connection from the pool (formerly list_development, now list_development_dbt_jyeo)
22:29:29  Using snowflake connection "list_development_dbt_jyeo"
22:29:29  On list_development_dbt_jyeo: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development_dbt_jyeo"} */
show terse objects in development.dbt_jyeo limit 10000
22:29:29  Opening a new connection, currently in state closed
22:29:31  SQL status: SUCCESS 47 in 2.0 seconds
22:29:31  On list_development_dbt_jyeo: Close
22:29:31  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1262d46a0>]}
22:29:31  Concurrency: 1 threads (target='sf')
22:29:31  
22:29:31  Began running node model.my_dbt_project.inc
22:29:31  1 of 1 START sql incremental model dbt_jyeo.inc ................................ [RUN]
22:29:31  Re-using an available connection from the pool (formerly list_development_dbt_jyeo, now model.my_dbt_project.inc)
22:29:31  Began compiling node model.my_dbt_project.inc
22:29:31  

>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:29:31.915<<<<<<<<<<

22:29:31  Writing injected SQL for node "model.my_dbt_project.inc"
22:29:31  Timing info for model.my_dbt_project.inc (compile): 11:29:31.900332 => 11:29:31.917901
22:29:31  Began executing node model.my_dbt_project.inc
22:29:31  Using snowflake connection "model.my_dbt_project.inc"
22:29:31  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
create or replace  temporary view development.dbt_jyeo.inc__dbt_tmp

   as (
    -- models/inc.sql

select 1 as id, sysdate() as dwh_ts, '2023-10-29 22:29:31.915' as dbt_ts
  );
22:29:31  Opening a new connection, currently in state closed
22:29:33  SQL status: SUCCESS 1 in 2.0 seconds
22:29:33  Using snowflake connection "model.my_dbt_project.inc"
22:29:33  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table development.dbt_jyeo.inc__dbt_tmp
22:29:34  SQL status: SUCCESS 3 in 0.0 seconds
22:29:34  Using snowflake connection "model.my_dbt_project.inc"
22:29:34  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table development.dbt_jyeo.inc
22:29:34  SQL status: SUCCESS 3 in 0.0 seconds
22:29:34  Using snowflake connection "model.my_dbt_project.inc"
22:29:34  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table "DEVELOPMENT"."DBT_JYEO"."INC"
22:29:34  SQL status: SUCCESS 3 in 0.0 seconds
22:29:34  Writing runtime sql for node "model.my_dbt_project.inc"
22:29:34  Using snowflake connection "model.my_dbt_project.inc"
22:29:34  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
-- back compat for old kwarg name

  begin;
22:29:35  SQL status: SUCCESS 1 in 0.0 seconds
22:29:35  Using snowflake connection "model.my_dbt_project.inc"
22:29:35  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
merge into development.dbt_jyeo.inc as DBT_INTERNAL_DEST
        using development.dbt_jyeo.inc__dbt_tmp as DBT_INTERNAL_SOURCE
        on (DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-29 22:27:26.570')) and (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )

    when matched then update set
        "ID" = DBT_INTERNAL_SOURCE."ID","DWH_TS" = DBT_INTERNAL_SOURCE."DWH_TS","DBT_TS" = DBT_INTERNAL_SOURCE."DBT_TS"

    when not matched then insert
        ("ID", "DWH_TS", "DBT_TS")
    values
        ("ID", "DWH_TS", "DBT_TS")

;
22:29:36  SQL status: SUCCESS 1 in 1.0 seconds
22:29:36  Using snowflake connection "model.my_dbt_project.inc"
22:29:36  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
COMMIT
22:29:36  SQL status: SUCCESS 1 in 0.0 seconds
22:29:36  Using snowflake connection "model.my_dbt_project.inc"
22:29:36  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
drop view if exists development.dbt_jyeo.inc__dbt_tmp cascade
22:29:37  SQL status: SUCCESS 1 in 0.0 seconds
22:29:37  Timing info for model.my_dbt_project.inc (execute): 11:29:31.919534 => 11:29:37.083943
22:29:37  On model.my_dbt_project.inc: Close
22:29:37  Sending event: {'category': 'dbt', 'action': 'run_model', 'label': 'a9930988-2a5c-4098-9a28-825701db6ada', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x127464d90>]}
22:29:37  1 of 1 OK created sql incremental model dbt_jyeo.inc ........................... [SUCCESS 1 in 5.75s]
22:29:37  Finished running node model.my_dbt_project.inc
22:29:37  Connection 'master' was properly closed.
22:29:37  Connection 'model.my_dbt_project.inc' was properly closed.
22:29:37  
22:29:37  Finished running 1 incremental model in 0 hours 0 minutes and 10.90 seconds (10.90s).
22:29:37  Command end result
22:29:37  
22:29:37  Completed successfully
22:29:37  
22:29:37  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
22:29:37  Command `dbt run` succeeded at 11:29:37.677371 after 12.03 seconds
22:29:37  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1115176d0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1262cc8b0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x126388f40>]}
22:29:37  Flushing usage events

Since this is a subsequent run of the incremental, the incremental_predicates config kicks into place - however, notice the condition in the merge is:

DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-29 22:27:26.570')

The timestamp (string) of which was from the first parse previously - even though in the model body code {{ ts }} was compiled correctly to the real-time timestamp.

Now let's try a third invocation:

$ dbt run --debug

22:32:33  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10f292640>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x111ebfd30>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x111efb3a0>]}
22:32:33  Running with dbt=1.6.6
22:32:33  running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/jeremy/.dbt', 'debug': 'True', 'fail_fast': 'False', 'log_path': '/Users/jeremy/src/dbt-basic/logs', 'version_check': 'True', 'warn_error': 'None', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'log_format': 'default', 'invocation_command': 'dbt run --debug', 'static_parser': 'True', 'introspect': 'True', 'target_path': 'None', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'send_anonymous_usage_stats': 'True'}
22:32:35  Sending event: {'category': 'dbt', 'action': 'project_id', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x111ebf430>]}
22:32:35  Sending event: {'category': 'dbt', 'action': 'adapter_info', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x124068910>]}
22:32:35  Registered adapter: snowflake=1.6.4
22:32:35  checksum: 546b81fb56652c304d87abd676e84d4737d8a0c6b62160f4a6e79dcddbc842bb, vars: {}, profile: , target: , version: 1.6.6
22:32:35  Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
22:32:35  Partial parsing enabled, no changes found, skipping parsing
22:32:35  Sending event: {'category': 'dbt', 'action': 'load_project', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1241e3f40>]}
22:32:35  Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x12412ef40>]}
22:32:35  Found 1 model, 0 sources, 0 exposures, 0 metrics, 376 macros, 0 groups, 0 semantic models
22:32:35  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x12412ed90>]}
22:32:35  
22:32:35  Acquiring new snowflake connection 'master'
22:32:35  Acquiring new snowflake connection 'list_development'
22:32:35  Using snowflake connection "list_development"
22:32:35  On list_development: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development"} */
show terse schemas in database development
    limit 10000
22:32:35  Opening a new connection, currently in state init
22:32:37  SQL status: SUCCESS 356 in 2.0 seconds
22:32:37  On list_development: Close
22:32:37  Re-using an available connection from the pool (formerly list_development, now list_development_dbt_jyeo)
22:32:37  Using snowflake connection "list_development_dbt_jyeo"
22:32:37  On list_development_dbt_jyeo: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "connection_name": "list_development_dbt_jyeo"} */
show terse objects in development.dbt_jyeo limit 10000
22:32:37  Opening a new connection, currently in state closed
22:32:39  SQL status: SUCCESS 47 in 2.0 seconds
22:32:39  On list_development_dbt_jyeo: Close
22:32:40  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x124075970>]}
22:32:40  Concurrency: 1 threads (target='sf')
22:32:40  
22:32:40  Began running node model.my_dbt_project.inc
22:32:40  1 of 1 START sql incremental model dbt_jyeo.inc ................................ [RUN]
22:32:40  Re-using an available connection from the pool (formerly list_development_dbt_jyeo, now model.my_dbt_project.inc)
22:32:40  Began compiling node model.my_dbt_project.inc
22:32:40  

>>>>>>>>>> macro set_ts() eval to: 2023-10-29 22:32:40.304<<<<<<<<<<

22:32:40  Writing injected SQL for node "model.my_dbt_project.inc"
22:32:40  Timing info for model.my_dbt_project.inc (compile): 11:32:40.290482 => 11:32:40.307625
22:32:40  Began executing node model.my_dbt_project.inc
22:32:40  Using snowflake connection "model.my_dbt_project.inc"
22:32:40  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
create or replace  temporary view development.dbt_jyeo.inc__dbt_tmp

   as (
    -- models/inc.sql

select 1 as id, sysdate() as dwh_ts, '2023-10-29 22:32:40.304' as dbt_ts
  );
22:32:40  Opening a new connection, currently in state closed
22:32:42  SQL status: SUCCESS 1 in 2.0 seconds
22:32:42  Using snowflake connection "model.my_dbt_project.inc"
22:32:42  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table development.dbt_jyeo.inc__dbt_tmp
22:32:42  SQL status: SUCCESS 3 in 0.0 seconds
22:32:42  Using snowflake connection "model.my_dbt_project.inc"
22:32:42  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table development.dbt_jyeo.inc
22:32:42  SQL status: SUCCESS 3 in 0.0 seconds
22:32:42  Using snowflake connection "model.my_dbt_project.inc"
22:32:42  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
describe table "DEVELOPMENT"."DBT_JYEO"."INC"
22:32:43  SQL status: SUCCESS 3 in 0.0 seconds
22:32:43  Writing runtime sql for node "model.my_dbt_project.inc"
22:32:43  Using snowflake connection "model.my_dbt_project.inc"
22:32:43  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
-- back compat for old kwarg name

  begin;
22:32:43  SQL status: SUCCESS 1 in 0.0 seconds
22:32:43  Using snowflake connection "model.my_dbt_project.inc"
22:32:43  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
merge into development.dbt_jyeo.inc as DBT_INTERNAL_DEST
        using development.dbt_jyeo.inc__dbt_tmp as DBT_INTERNAL_SOURCE
        on (DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-29 22:27:26.570')) and (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )

    when matched then update set
        "ID" = DBT_INTERNAL_SOURCE."ID","DWH_TS" = DBT_INTERNAL_SOURCE."DWH_TS","DBT_TS" = DBT_INTERNAL_SOURCE."DBT_TS"

    when not matched then insert
        ("ID", "DWH_TS", "DBT_TS")
    values
        ("ID", "DWH_TS", "DBT_TS")

;
22:32:44  SQL status: SUCCESS 1 in 1.0 seconds
22:32:44  Using snowflake connection "model.my_dbt_project.inc"
22:32:44  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
COMMIT
22:32:44  SQL status: SUCCESS 1 in 0.0 seconds
22:32:45  Using snowflake connection "model.my_dbt_project.inc"
22:32:45  On model.my_dbt_project.inc: /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "all", "target_name": "sf", "node_id": "model.my_dbt_project.inc"} */
drop view if exists development.dbt_jyeo.inc__dbt_tmp cascade
22:32:45  SQL status: SUCCESS 1 in 0.0 seconds
22:32:45  Timing info for model.my_dbt_project.inc (execute): 11:32:40.309566 => 11:32:45.383774
22:32:45  On model.my_dbt_project.inc: Close
22:32:45  Sending event: {'category': 'dbt', 'action': 'run_model', 'label': 'cb0fb986-5a80-46a9-9ec2-78b85431a3fd', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x125225df0>]}
22:32:45  1 of 1 OK created sql incremental model dbt_jyeo.inc ........................... [SUCCESS 1 in 5.66s]
22:32:45  Finished running node model.my_dbt_project.inc
22:32:45  Connection 'master' was properly closed.
22:32:45  Connection 'model.my_dbt_project.inc' was properly closed.
22:32:45  
22:32:45  Finished running 1 incremental model in 0 hours 0 minutes and 10.79 seconds (10.79s).
22:32:45  Command end result
22:32:45  
22:32:45  Completed successfully
22:32:45  
22:32:45  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
22:32:45  Command `dbt run` succeeded at 11:32:45.983207 after 12.83 seconds
22:32:45  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10f292640>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x124ef1be0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x125206a90>]}
22:32:45  Flushing usage events

Same deal as above... set_ts() evaluation has move forward, model body compilation has as well but the incremental_predicate is still using the value from the very first time we did a dbt run above:

DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-29 22:27:26.570')

Workaround

Disable partial parsing or use a timestamp function from the datawarehouse like so:

-- models/inc.sql
{%- set ts = get_ts() -%}

{{
    config(
      unique_key = 'id',
      incremental_predicates = ["DBT_INTERNAL_DEST.dwh_ts >= date(sysdate())"]
    )
}}

select 1 as id, sysdate() as dwh_ts, '{{ ts }}' as dbt_ts
jtcohen6 commented 10 months ago

@jeremyyeo Thanks for the thorough & detailed writeup!

use a timestamp function from the datawarehouse like so

This workaround is compelling to me, and it's what I would have recommended to completely avoid the issue.

Another potential workaround: Since dbt-core is capable of detecting changes to environment variables, include a "volatile" environment variable as another config on the model. For example:

-- macros/get_ts.sql
{% macro get_ts() %}
    {% set job_id = env_var("DBT_CLOUD_RUN_ID") %}
    {% set ts = modules.datetime.datetime.utcnow().isoformat(sep=" ", timespec="milliseconds") %}
    {% do log('\n\n>>>>>>>>>> macro set_ts() eval to: ' ~ ts ~ '<<<<<<<<<<\n\n', True) %}
    {{ return(ts) }}
{% endmacro %}
$ DBT_CLOUD_RUN_ID=1234 dbt -q ls --output json --output-keys config | jq .config.incremental_predicates[0]
"DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-30 15:32:59.715')"
$ DBT_CLOUD_RUN_ID=1234 dbt -q ls --output json --output-keys config | jq .config.incremental_predicates[0]
"DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-30 15:32:59.715')"
$ DBT_CLOUD_RUN_ID=5678 dbt -q ls --output json --output-keys config | jq .config.incremental_predicates[0]
"DBT_INTERNAL_DEST.dwh_ts >= date('2023-10-30 15:33:08.897')"
d1manson commented 6 months ago

Coming back to the original issue itself, I wonder if there are some cleaner solutions that haven't been fully considered. For example, what if in your dbt project yaml you could define custom period types such as weekday or workinghour, and then in freshness checks reference those. In terms of the syntax for defining a custom period we can debate the details, but you could imagine something like:

period-types:
   workinghour:
        description: massively discount importance of nights and weekends
        base_unit: hour
        - start: 30 18 * * 1-4
           end:  0 7 * * 2-5
           gain: 0.1  # hours during weekday night count for 1/10th of a regular hour  
        - start: 30 18 * * 5
           end: 0 7 * * 1
           gain: 0.05 # hours during weekend count for 1/20th of a regular hour

not convinced this is the best syntax, but you could imagine validating if starts and ends match and implementing a generic freshness calculator that can understands this syntax.

and then in a source yaml file...

...
    freshness:
       error_after:
           period: workinghour
           count: 6

you could go further still and allow for defining full freshness SLAs (period & count) in the dbt project that you would reference by name in a given source yaml, though I think the version above is probably more useful if it's not so much about an SLA, but about when new data is actually expected (e.g. stock market data as per the original example, or CRM updates as in my case).