dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.71k stars 1.48k forks source link

Built-in IO Managers should support the ingestion of partitioned assets by non-partitioned assets #10908

Open alexprykhodko opened 1 year ago

alexprykhodko commented 1 year ago

Dagster version

dagster, version 1.1.5

What's the issue?

Given the following assets:

def upstream_asset(context: OpExecutionContext, raw_data):
    ...

@asset
def downstream_asset(context: OpExecutionContext, upstream_asset):
    ...

when using s3_pickle_io_manager, the following error is reported at the job start:

  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/inputs.py", line 856, in _load_input_with_input_manager
    value = input_manager.load_input(context)
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 72, in load_input
    key = self._get_path(context)
  File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 33, in _get_path
    path = context.get_asset_identifier()
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 409, in get_asset_identifier
    return [*self.asset_key.path, self.asset_partition_key]
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 324, in asset_partition_key
    return self.step_context.asset_partition_key_for_input(self.name)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/system.py", line 915, in asset_partition_key_for_input
    check.failed(
  File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1642, in failed
    raise CheckError(f"Failure condition: {desc}")

The error is not thrown when fs_io_manager is used.

What did you expect to happen?

The downstream_asset should receive the argument upstream_asset that contains the dictionary with the keys being all of the partitions of the upstream_asset.

How to reproduce?

See the section above.

Deployment type

Local

Deployment details

Python 3.8.13 macOS 13.0

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

dpeng817 commented 1 year ago

It sounds like this is an implementation incongruency between our different IO managers - I wonder if we could implement the ingestion in such a way that it only needed to be written once for all IO managers, similar to our path construction machinery.

alexprykhodko commented 1 year ago

@dpeng817 I ended up doing something similar in my version of IO managers, after creating a base S3 IO class – the derived classes simply implement read_from_path() and write_to_path():

class S3MsgPackIOManager(BaseS3IOManager):

    def __init__(self, s3_bucket, s3_prefix=None, scheme='s3', use_compression=False):
        super().__init__(s3_bucket, s3_prefix, scheme)
        self._use_compression = use_compression

    def _write_to_path(self, path: UPath, obj):
        with path.open('wb') as f_out:
            if self._use_compression:
                zst = zstandard.ZstdCompressor(level=5)
                writer = zst.stream_writer(f_out)
            else:
                writer = f_out
            msgpack.dump(obj, writer)
            writer.close()

    def _read_from_path(self, path: UPath):
        result = None
        with path.open('rb') as f_in:
            if self._use_compression:
                zst = zstandard.ZstdDecompressor()
                reader = zst.stream_reader(f_in)
            else:
                reader = f_in
            result = msgpack.load(reader)
            reader.close()
        return result

    def _get_file_extension(self):
        return 'msgpack' if not self._use_compression else 'msgpack.zst'