dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.55k stars 1.46k forks source link

KeyError raised when CachingStaleStatusResolver fails to find asset key associated with a dbt 'source' in v1.6.7+ #20286

Closed the4thamigo-uk closed 5 months ago

the4thamigo-uk commented 7 months ago

Dagster version

1.6.7

What's the issue?

I'm facing an issue with CachingStaleStatusResolver in v1.6.7. This seems to occur when the asset graph contains a @dbt_asset that is based on a dbt project that has one or more dbt 'source'-s defined. I wasnt seeing this issue in v1.6.6, and I notice there are quite a few changes in this area (https://github.com/dagster-io/dagster/pull/19901).

I am using CachingStaleStatusResolver to work out which partitions are stale for a job, in order to generate runs in a schedule, and this was working fine up to and including v1.6.6. I am calling CachingStaleStatusResolver.get_status() on the asset keys of the job's assets, in order to determine their status. In order to use CachingStaleStatusResolver, I am creating an AssetGraph from my project's AssetDefinitions to do this. The same behaviour occurs if I use the asset graph accessed through the repository object of the Definitions instance, so it seems not to be related to the way that I construct the asset graph.

However, in v1.6.7, when I include a @dbt_asset in the Asset graph, and I request the status of the dagster asset that represents a 'model' that is dependent on the dbt 'source', then _get_stale_causes_materialized() checks the status of the parent asset keys. Included in these keys is the key for the dbt 'source', but this does not seem to be represented in the AssetGraph at all, so I am seeing a KeyError in AssetGraph.is_external(). It seems only full SourceAssets are added to the AssetGraph when I use AssetGraph.from_assets, and it looks like dbt 'source' nodes are not represented this way in dagster.

I realise that this is an internal class, but there doesn't seem to be a publicly-exposed way to request the stale status of a given asset/partition. However, I would have thought that creating an AssetGraph with a dbt_asset, and subsequently calling get_status() on a 'model' asset, should not crash with an exception?

More detail: Interestingly in the v1.6.6 code I also notice that there is no asset in the graph that represents the dbt 'source' either, but the only reason AssetGraph.is_source() doesn't fail in v1.6.6, is that the function returns True if the asset_key is not in the list of materializable keys, so is_source("ahdsfjlkad") also returns True.

KeyError: AssetKey(['landscape', 'daily_sales_report'])
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/decorators/schedule_decorator.py", line 164, in _wrapped_fn
    yield from cast(RunRequestIterator, ensure_gen(result))
  File "/app/pipeline/triggers/utils.py", line 59, in generate_run_requests_for_stale_or_missing_partitions
    [
  File "/app/pipeline/triggers/utils.py", line 62, in <listcomp>
    if partition_is_stale(partition_key)
  File "/app/pipeline/triggers/utils.py", line 52, in partition_is_stale
    stale_status = is_stale(asset_key, partition_key)
  File "/app/pipeline/utils/instance.py", line 65, in is_stale
    status = resolver.get_status(asset_key, partition_key)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 399, in get_status
    return self._get_status(key=AssetKeyPartitionKey(key, partition_key))
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 66, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 435, in _get_status
    causes = self._get_stale_causes(key=key)
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 66, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 452, in _get_stale_causes
    return sorted(
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 516, in _get_stale_causes_materialized
    if self._get_status(key=dep_key) == StaleStatus.STALE:
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 66, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 429, in _get_status
    current_version = self._get_current_data_version(key=key)
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 66, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 624, in _get_current_data_version
    record = self._get_latest_data_version_record(key=key)
  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/cached_method.py", line 66, in _cached_method_wrapper
    result = method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/data_version.py", line 681, in _get_latest_data_version_record
    and not self.asset_graph.is_external(key.asset_key)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/internal_asset_graph.py", line 142, in is_external
    return self.get_assets_def(asset_key).is_external
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/internal_asset_graph.py", line 79, in get_assets_def
    return self._assets_defs_by_key[asset_key]

What did you expect to happen?

I don't expect it to crash.

How to reproduce?

I can provide an example project if necessary.

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

smackesey commented 7 months ago

Hi @the4thamigo-uk, AssetGraph (which as you note is a private API) is under very heavy surgery right now, so it's not surprising you're seeing problems. There is a fresh batch of changes that will roll out next week that includes performing "normalization" on the assets defs passed in to the AssetGraph, which will create stub defs for any referenced asset keys otherwise lacking a def-- I'm guessing that is going to fix your problem here, so my advice is to try upgrading to next week's release.

Let's leave the issue open until then and revisit if that doesn't work.

there doesn't seem to be a publicly-exposed way to request the stale status of a given asset/partition.

That's a good point, the closest thing we have is the GQL API. I'm going to put adding a Python API for this in our internal queue.

the4thamigo-uk commented 7 months ago

Thanks @smackesey ... sounds good...

the4thamigo-uk commented 7 months ago

Looks like the 1.6.9 release might have fixed this, was that expected?

smackesey commented 7 months ago

Looks like the 1.6.9 release might have fixed this, was that expected?

It wouldn't be surprising, since AssetGraph is under heavy surgery. Since it's a private API I can't recall exactly which changes are falling into which release.

the4thamigo-uk commented 7 months ago

@smackesey I have also noticed that if I have an partitioned asset that depends on an external partitioned asset, the staleness of the asset doesnt seem to reflect materialization events on the external asset. I dont get the new data indication in the UI, and CachingStaleStatusResolver seems to return the asset as FRESH. How is this meant to work?

Similar problem if I raise asset observation events on the external asset.

the4thamigo-uk commented 5 months ago

@smackesey btw, seems we can close this. ON another note do you have any idea when the stale status API will become available? I also created an issue about this a while ago https://github.com/dagster-io/dagster/issues/19368