dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.7k stars 1.48k forks source link

Materialization of non-partitioned asset with upstream partitioned asset errors in Dagster Cloud with serverless IO manager #18633

Open rlwolf17 opened 11 months ago

rlwolf17 commented 11 months ago

Dagster version

1.5.5

What's the issue?

Overview: I have 4 assets with linear dependencies, where the first two are partitioned using a TimeWindowPartitionsDefinition . When running dagster dev and developing locally, I am able to materialize all assets together in the same run with no issues:

image

However, in my deployment of Dagster Cloud (where I have ensured that dagster version is also 1.5.5) I encounter an error when attempting to materialize the first non-partitioned asset following the upstream partitioned assets: "dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['cems', 'facilities', 'facilities_s3_standardized_keys'])', but the number of input partitions != 1: 'TimeWindowPartitionsSubset([PartitionKeyRange(start='1995-01-01', end='2023-01-01')])'."

image

Full stack trace:

dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "facilities_s3_standardized_keys" of step "cems__facilities__facilities_table_from_raw_file_column_superset":
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 275, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 442, in core_dagster_event_sequence_for_step
    for event_or_input_value in step_input.source.load_input_object(step_context, input_def):
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 486, in load_input_object
    yield from _load_input_with_input_manager(input_manager, load_input_context)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 824, in _load_input_with_input_manager
    with op_execution_error_boundary(
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(
The above exception was caused by the following exception:
dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['cems', 'facilities', 'facilities_s3_standardized_keys'])', but the number of input partitions != 1: 'TimeWindowPartitionsSubset([PartitionKeyRange(start='1995-01-01', end='2023-01-01')])'.
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/plan/inputs.py", line 831, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/usr/local/lib/python3.10/site-packages/dagster_cloud/serverless/io_manager.py", line 87, in load_input
    key = self._get_path(context)
  File "/usr/local/lib/python3.10/site-packages/dagster_cloud/serverless/io_manager.py", line 52, in _get_path
    path = context.get_asset_identifier()
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 456, in get_asset_identifier
    return [*self.asset_key.path, self.asset_partition_key]
  File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/context/input.py", line 345, in asset_partition_key
    check.failed(
  File "/usr/local/lib/python3.10/site-packages/dagster/_check/__init__.py", line 1627, in failed
    raise CheckError(f"Failure condition: {desc}")`

What did you expect to happen?

My understanding from reading how non-partitioned and partitioned assets interact together was that the behavior I observed during development was expected. Please let me know if my understand is incorrect or if there is anything I can do on my side to resolve this issue, thanks!

How to reproduce?

Materialize a non-partitioned asset with a time-window partitioned asset as input parameter.

My definitions:

yearly_partitions_def = TimeWindowPartitionsDefinition(cron_schedule="2 2 2 1 *", start=spec["start_date"], fmt="%Y-%m-01", end_offset=1)

@asset(
        key_prefix=[SOURCE_NAME, spec["resource_name"]],
        partitions_def=yearly_partitions_def,
        ins=None,
        non_argument_deps={AssetKey([SOURCE_NAME, spec["resource_name"], 'facilities_s3_raw_keys'])}
    )
def facilities_s3_standardized_keys(context):
...

@asset(
        key_prefix=[spec["source_name"], spec["resource_name"]],
        ins={"facilities_s3_standardized_keys": AssetIn(key_prefix=[spec["source_name"], spec["resource_name"]])}
    )
def facilities_table_from_raw_file_column_superset(facilities_s3_standardized_keys):
..

Deployment type

Dagster Cloud

Deployment details

Dagster Serverless Cloud Deployment

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

rlwolf17 commented 10 months ago

Hi @sryza , is this a bug with dagster_cloud/serverless/io_manager.py?

Are there any workarounds in the meantime? Given this is an issue for us on dagster cloud side only still I am not really sure how to proceed other than to stop using partitions or attempt using a different IO manager if we want to combine non-partitioned and partitioned assets

rlwolf17 commented 10 months ago

Seems to be related to this issue: https://github.com/dagster-io/dagster/issues/13290

alangenfeld commented 9 months ago

It appears that the serverless IO manager did not yet receive the fixes mentioned in https://github.com/dagster-io/dagster/issues/13290.

Are there any workarounds in the meantime?

Seeing s3 mentioned in the summary, switching to the s3_io_manager would likely be the fastest workaround https://docs.dagster.io/deployment/guides/aws#using-s3-for-io-management