dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.12k stars 1.39k forks source link

Asset Versioning with partitioned assets does not detect out-of-sync assets after incrementing code_version #22704

Open sam-goodwin opened 2 months ago

sam-goodwin commented 2 months ago

Dagster version

dagster, version 1.7.10

What's the issue?

Asset versioning doesn't properly detect when a partitioned asset is out of sync after incrementing its version or an upstream asset's version. It always says "everything is up to date".

If I remove the partitions_def, everything works as expected. The problems seems to be isolated to partitioned assets.

What did you expect to happen?

Every time I increment code_version in the second asset, I expect it to be displayed as un-synced (it is not)

image

I expected clicking "materialize un-sycned" to provide me a list with the "second" asset and not the "first" asset, but it tells me all assets are up to date:

image

How to reproduce?

This is the simple repro I am using to test partitions + asset versioning.

from dagster import StaticPartitionsDefinition, asset

test_partitions = StaticPartitionsDefinition(["first", "second"])

@asset(
    code_version="v1",
    partitions_def=test_partitions,
)
def first():
    return "first"

@asset(
    code_version="v4", # i keep incrementing this, sometimes it is detected as un-synced, sometimes not
    partitions_def=test_partitions,
)
def second(first):
    return first + " second"

Deployment type

None

Deployment details

I am just running dagster dev with the following config:

storage:
  sqlite:
    base_dir:
      env: DAGSTER_DATA_BASE_DIR

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sam-goodwin commented 2 months ago

I just tried updating first to v2 and it still says everything is up to date ...

@asset(
    code_version="v2",
    partitions_def=test_partitions,
)
def first():
    return "first"

Screenshot 2024-06-25 at 11 04 45 AM

sam-goodwin commented 2 months ago

I also get this error

image

When selecting "only backfill missing or failed assets" and clicking "preview":

image

sam-goodwin commented 2 months ago

I can confirm that everything behaves as expected in a non-partitioned job. I don't see this limitation anywhere on the docs, however.

garethbrickman commented 2 months ago

Could be related: https://github.com/dagster-io/dagster/issues/22553

ClaytonSmith commented 2 months ago

out of sync partitions has been a huuuge issue for me. Now showing downstream partitions as being out of sync has been deeply impactful (in a bad way). Not just when incrementing the version but also just updating parent partitions

ClaytonSmith commented 2 months ago

I just found out Dagster IS aware of out of sync partitions

image

Please, someone let me know what needs to happen to get a fix for this. These green dots must be yellow. The ability to rebuild outdated partitions is soooooo valuable.

sam-goodwin commented 2 months ago

Is it just the UI that's broken or is there a fundamental problem in the backend?

sam-goodwin commented 2 months ago

This query returns the correct information indicating that dagster does know which assets are stale for each partition.

query AssetsByGroup($groupName: String!) {
  assetNodes(group: {
    groupName: $groupName,
    repositoryName:"__repository__",
    repositoryLocationName:"your_pkg.defs"
  }) {
    id
    assetKey {
      path
    }
    staleStatusByPartition(partitions:[
      "first",
      "second"
    ])
  }
}

What I am unsure about is how to launch a materialization for many partitions and have each run only include the un-synced assets for each partition.

sam-goodwin commented 2 months ago

Looks like the GraphQL schema is built with a strong coupling to time-based partitions (which does not align with my system):

input PartitionsByAssetSelector {
  assetKey: AssetKeyInput!
  partitions: PartitionsSelector
}

input PartitionsSelector {
  range: PartitionRangeSelector!
}

When launching a backfill, you can't specify a list of partitions per asset. You can only specify a range. I am seeing this over-fitting to time-based partitions a lot in Dagster's design.

sam-goodwin commented 2 months ago

Oh actually, it looks like I can use assetSelection and partitionNames along with batching to achieve this behavior.

Here's a prototype that materializes stale partitions of assets in a group: https://gist.github.com/sam-goodwin/d8dd76ad58a241cdb14deba9cb53c2bf

[!Note] It makes the assumption that the partitioning scheme of each asset in a group is the same (this may not be true for you)

sam-goodwin commented 2 months ago

Just discovered that the following GraphQL query is extremely slow and can't be executed in parallel because it will crash dagster's SQL database:

query AssetStaleStatus(
    $groupName: String!,
    $assetKey: AssetKeyInput!,
    $partitionKeys: [String!]!,
    $repositoryLocation: String!
) {
  assetNodes(group: {
    groupName: $groupName,
    repositoryName:"__repository__",
    repositoryLocationName: $repositoryLocation
  }, assetKeys: [$assetKey]) {
    id
    staleStatusByPartition(partitions: $partitionKeys)
  }
}