dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.64k stars 1.47k forks source link

s3_pickle_io_manager does not support inputs with multiple partitions #13290

Closed TimoVink closed 1 year ago

TimoVink commented 1 year ago

Dagster version

1.2.4

What's the issue?

Hi all. New to dagster, having a good time so far -- very exciting stuff!

Unfortunately I ran into the following issue: When using the s3_pickle_io_manager with an asset that takes in a partitioned asset as input, we get the following error:

dagster._check.CheckError: Failure condition: Tried to access partition key for asset 'AssetKey(['my_asset'])', but the number of input partitions != 1

Upon inspection, comparing s3_pickle_io_manager with fs_io_manager, we see that the S3 IO manager inherits from MemoizableIOManager directly, while the FS IO manager inherits indirectly via UPathIOManager. It seems UPathIOManager is what contains the logic for handling partitioned assets -- so that's missing from the S3 IO manager.

What did you expect to happen?

Ideally it'd be nice if all first-party IO managers, like this S3 one, supported the full range of dagster functionality (barring limitations of the underlying IO system).

However, if that is impracticable, perhaps at least it'd be nice to:

How to reproduce?

Try to materialize unique_top_stories in this example with s3_pickle_io_manager:

partitions_def = DynamicPartitionsDefinition(name='top_stories')

@asset(partitions_def=partitions_def)
def top_stories(context):
    # omitted

@asset
def unique_top_stories(top_stories):
    # omitted

Deployment type

Dagster Helm chart

Deployment details

No response

Additional information

Went forth and back on whether this is a "bug" or a "feature request". Ended up tossing it on the bug side of the fence because:

Of course feel free to reclassify as you see fit :)

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

TimoVink commented 1 year ago

For now, I'm using this custom S3 IO manager. This has not been tested in any amount of detail, and in all likelihood is not directly compatible with the s3_pickle_io_manager:

from dagster import DagsterInvariantViolationError, Field, InitResourceContext, InputContext, OutputContext, StringSource, UPathIOManager, _check as check, io_manager
from typing import Any
from upath import UPath
import pickle

PICKLE_PROTOCOL = 4

'''
The built-in S3 IO manager does not handle partitions, so unfortunately we must write our own.
Source code based on combining code from the following 2 built-in IO managers:
  - https://github.com/dagster-io/dagster/blob/10f064352cf138c612f972d930cabb5a0823b3a9/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py#L18
  - https://github.com/dagster-io/dagster/blob/10f064352cf138c612f972d930cabb5a0823b3a9/python_modules/dagster/dagster/_core/storage/fs_io_manager.py#L128
'''

class MyS3IOManager(UPathIOManager):
    def __init__(self, s3_bucket, s3_prefix=None):
        s3_bucket = check.str_param(s3_bucket, 's3_bucket')
        s3_prefix = check.opt_str_param(s3_prefix, 's3_prefix')
        super().__init__(base_path=UPath(f's3://{s3_bucket}/{s3_prefix}'))

    def dump_to_path(self, context: OutputContext, obj: Any, path: UPath):
        try:
            with path.open('wb') as file:
                pickle.dump(obj, file, PICKLE_PROTOCOL)

        except (AttributeError, RecursionError, ImportError, pickle.PicklingError) as e:
            executor = context.step_context.pipeline_def.mode_definitions[0].executor_defs[0]

            if isinstance(e, RecursionError):
                # if obj can't be pickled because of RecursionError then __str__() will also
                # throw a RecursionError
                obj_repr = f'{obj.__class__} exceeds recursion limit and'
            else:
                obj_repr = obj.__str__()

            raise DagsterInvariantViolationError(f'Object {obj_repr} is not picklable.') from e

    def load_from_path(self, context: InputContext, path: UPath) -> Any:
        with path.open('rb') as file:
            return pickle.load(file)

@io_manager(
    config_schema={
        's3_bucket': Field(StringSource),
        's3_prefix': Field(StringSource, is_required=False, default_value='dagster'),
    },
)
def my_s3_io_manager(init_context: InitResourceContext) -> "MyS3IOManager":
    s3_bucket = init_context.resource_config['s3_bucket']
    s3_prefix = init_context.resource_config.get('s3_prefix')
    return My S3IOManager(s3_bucket, s3_prefix)
TimoVink commented 1 year ago

This issue is related to https://github.com/dagster-io/dagster/issues/10908, though smaller in scope and therefore perhaps more actionable?

adjit commented 1 year ago

For further context, the error is thrown from the s3 IO Manager to get_asset_identifier https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/s3/io_manager.py#L32. This will cause an error to be thrown here for self.asset_partition_key https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/execution/context/input.py#L459, where the error is ultimately thrown from here - https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/execution/context/input.py#L332. The IO Manager needs to handle the other types of assets.

smackesey commented 1 year ago

Thanks for the report @TimoVink, as there are two related issues here that have been around a while I'll take a look.

sryza commented 1 year ago

Heads up – I updated the title to reflect my understanding of what's going on here: I believe this is specific to the case where an input corresponds to multiple partitions. E.g. in the case of your example @TimoVink , a non-partitioned asset is depending on a partitioned asset, so the input of the non-partitioned asset corresponds to every partition of the partitioned asset.

adjit commented 1 year ago

@smackesey is there any update on the progress of this? Trying to come up with a custom IO manager to handle this, but running into some issues

smackesey commented 1 year ago

Thanks for pinging me @adjit, I'm working on this now and hope to have a PR up today.