dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.14k stars 1.4k forks source link

Dynamic type checking fails when downstream asset refers to a group of upstream partitions #12426

Open plaflamme opened 1 year ago

plaflamme commented 1 year ago

Dagster version

1.1.18

What's the issue?

Assuming a custom IO manager that can provide different representations of an asset, i.e.:

class MyIOManager(UPathIOManager):
  # returns an pandas.DataFrame or pyarrow.Table based on dynamic type checking
  def load_from_path(self, context: InputContext, path: UPath) -> Any:
    tpe = context.dagster_type.typing_type
    if tpe == pandas.DataFrame:
      # returns a pandas.DataFrame
      return ...
    elif tpe == pyarrow.Table:
      # returns a pyarrow.Table
      return ...

Then assuming an hourly asset exists:

@asset(partitions_def=HourlyPartitionDefinition(...))
def upstream() -> pandas.DataFrame:
  pass

Downstream assets can use either:

@asset(partitions_def=HourlyPartitionDefinition(...))
def use_pandas(upstream: pandas.DataFrame):
  pass

@asset(partitions_def=HourlyPartitionDefinition(...))
def use_arrow(upstream: pyarrow.Table):
  pass

But if a downstream asset wishes to combine the hourly partitions in to a daily partition, type checking fails:

@asset(partitions_def=DailyPartitionDefinition(...))
def use_pandas(upstream: Dict[str, pandas.DataFrame]):
  pass

@asset(partitions_def=DailyPartitionDefinition(...))
def use_arrow(upstream: Dict[str, pyarrow.Table]):
  pass

What did you expect to happen?

Dynamic type checking should allow the situation where a downstream asset takes a group of upstream partitions and an IO manager is able to read different "flavours" of an asset.

How to reproduce?

No response

Deployment type

Local

Deployment details

No response

Additional information

Relevant slack thread: https://dagster.slack.com/archives/C01U954MEER/p1676585117044459

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 1 year ago

@plaflamme I'm trying to triangulate this a little bit. Which of the following is the case for this?

plaflamme commented 1 year ago

@sryza sorry for the lack of details, I wrote this one rather quickly.

I believe the issue is the first category: "Dagster is raising an error in a situation where it shouldn't be raising an error".

Basically, in the example above (for the assets that use Dict[str, ?]), Dagster will raise the following error:

dagster._check.CheckError: Failure condition: Received `typing.Dict[str, polars.internals.dataframe.frame.DataFrame]` type in input of DagsterType <dagster._core.types.python_dict._TypedPythonDict object at 0x7fff811c2800>, but `<bound method PartitionedParquetIOManager.load_from_path of <core.parquet_io_manager.PartitionedParquetIOManager object at 0x7fff80f9b670>>` has typing.Any type annotation for obj. They should be both specified with type annotations and match. If you are loading multiple partitions, the upstream asset type annotation should be a typing.Dict.

Though, I'm not sure if the issue is in UPathIOManager or Dagster itself.

What I'm trying to accomplish is having an IOManager that can dynamically provide/receive different types of inputs/outputs. This works fine when assets use simple type annotations (e.g.: def the_asset(upstream: X)), but it breaks down when the annotation uses Dict[str, X] where X is one of the possible types the IOManager is able to provide. This situation occurs when a downstream asset ingests multiple upstream partitions for example.

Said otherwise, I could not find a way to annotate methods on an IOManager that can dynamically provide X or Y types such that assets can use def the_asset(upstream: X|Y) or def the_asset(upstream: Dict[str, X|Y]).

sryza commented 1 year ago

Got it - this is in the UPathIOManager, and I think it's being overly strict. I'm going to relax this.