dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.6k stars 1.46k forks source link

Make runtime output metadata available to downstream ops #8521

Open sryza opened 2 years ago

sryza commented 2 years ago

Requests for this:

Workaround for now, if you're using software-defined assets and with IO managers:

def handle_output(self, context):
    context.add_output_metadata({"foo": "bar"})

def load_input(self, context):
    asset_key = context.asset_key
    event_log_entry = context.step_context.instance.get_latest_materialization_event(asset_key)
    metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata
    foo_value = metadata["foo"].value
    assert foo_entry.text == "bar"

Workaround for now, if you're using software-defined assets without IO managers

@asset
def upstream():
    return MaterializeResult(metadata={"foo": "bar"})

@asset(deps=[upstream])
def downstream_asset(context):
    materialization = context.instance.get_latest_materialization_event(upstream.key).asset_materialization
    metadata = materialization.metadata
    foo_value = metadata["foo"].value
    assert foo_entry.text == "bar"
dpeng817 commented 2 years ago

This doesn't work if you have partitioned assets - no way to specify which partition key you're searching for.

dpeng817 commented 2 years ago

If you want to figure out the latest materialization event for a particular partition key:

@asset(non_argument_deps={AssetKey("some_upstream_asset")})
def downstream_asset(context):
    event_log_record = context.instance.get_event_records(EventRecordsFilter(asset_key=AssetKey("some_upstream_asset"), asset_partitions=["some_partition"]), ascending=False)
    event_log_entry = event_log_record.event_log_entry
    metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata
    foo_value = metadata["foo"].value
    assert foo_entry.text == "bar"
danielgafni commented 2 years ago

For documentation purposes here is a snipped for getting the mapped upstream asset partitions (if using partition mappings):

partitions_def = context.asset_partitions_def
assert isinstance(partitions_def, TimeWindowPartitionsDefinition)
partitions = partitions_def.get_partition_keys_in_range(context.asset_partition_key_range)
danielgafni commented 2 years ago

@sryza maybe @dpeng817 's snippet deserves to be a dagster utility function?

sryza commented 2 years ago

@danielgafni - I agree that we should make this more straightforward.

A potential path:

So then @dpeng817's example would turn into:

@asset(non_argument_deps={AssetKey("some_upstream_asset")})
def downstream_asset(context):
    materialization = context.instance.get_latest_materialization_event(AssetKey("some_upstream_asset"), partition_key="xyz)).materialization
    foo_value = materialization.metadata["foo"].value
    assert foo_entry.text == "bar"
danielgafni commented 2 years ago

Probably (1) looks nicer

sowusu-ba commented 1 year ago

Any workaround for getting metadata passed via context.add_input_metadata?

sryza commented 1 year ago

@sowusu-ba are you using ops or assets? And do you mean context.add_output_metadata?

xmarcosx commented 1 year ago

@sryza In the snippet you provided, I don't believe context.instance.get_latest_materialization_event(asset_key) would work. context here is an InputContext object and does not have .instance available.

def load_input(self, context):
    asset_key = context.asset_key
    event_log_entry = context.instance.get_latest_materialization_event(asset_key)
    metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata
    foo_value = metadata["foo"].value
    assert foo_entry.text == "bar"

Looks like it should be:

def load_input(self, context):
    asset_key = context.asset_key
    event_log_entry = context.step_context.instance.get_latest_materialization_event(asset_key)
    metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata
    foo_value = metadata["foo"].value
    assert foo_entry.text == "bar"
sowusu-ba commented 1 year ago

@sowusu-ba are you using ops or assets? And do you mean context.add_output_metadata?

@sryza I am using assets and I mean context.add_input_metadata

sryza commented 1 year ago

@xmarcosx thanks for catching that. I updated the example with step_context.

sryza commented 1 year ago

@sryza I am using assets and I mean context.add_input_metadata

So you want add_input_metadata during load_input and then access it within @asset?

You should be able to do something like

@asset
def downstream_asset(context, upstream_asset):
    latest_observation_record = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_OBSERVATION,
            asset_key=AssetKey(["upstream_asset"]),
        ),
        ascending=False,
        limit=1,
    )[0]
    metadata = latest_observation_record.event_log_entry.dagster_event.event_specific_data.asset_observation.metadata
rohilbadkundri commented 1 year ago

Any workaround for ops & access within an IOManager?

def handle_output(self, context, obj):
    context.add_output_metadata({"foo": "bar"})

def load_input(self, context):
    # how can the metadata be accessed here if obj isn't an asset and is instead an op output?
sryza commented 1 year ago

@rohilbadkundri this isn't a full answer, but the place I'd start exploring is with something like:

def load_input(self, context):
    event = context.instance.all_logs(run_id=context.run_id, of_type=DagsterEventType.STEP_OUTPUT)

then filter down to the event you care about and pull the metadata off of it

SebaJeku commented 1 year ago

The discussion seems to be solely on the workaround, not on the request itself. Is there any way, "Make runtime output metadata available to downstream ops" becomes a feature/change request?

sryza commented 1 year ago

@SebaJeku we are definitely tracking this as a feature/change request and hope to resolve it when we have bandwidth.

jamiedemaria commented 7 months ago

A related issue https://github.com/dagster-io/dagster/issues/20094

DennisSchwartz commented 3 months ago

Here is a longer snippet for use in load_input which works for me so far:


def get_metadata_per_partition(
    instance: DagsterInstance, asset_key: AssetKey
) -> dict[str, Any]:
    mats = get_latest_materialization_per_partition(instance, asset_key)
    latest_partitions = {partition: mat["metadata"] for partition, mat in mats.items()}
    return latest_partitions

def get_latest_materialization_per_partition(
    instance: DagsterInstance, asset_key: AssetKey
) -> dict[str, Any]:
    ef = EventRecordsFilter(
        asset_key=asset_key, event_type=DagsterEventType.ASSET_MATERIALIZATION
    )
    mat_events = instance.get_event_records(event_records_filter=ef)
    all_materializations: dict[str, list[Any]] = {}
    for event in mat_events:
        mat = event.event_log_entry.dagster_event.event_specific_data.materialization
        partitions = str(mat.partition)
        if partitions not in all_materializations:
            all_materializations[partitions] = []
        all_materializations[partitions].append(event)

    # Only keep the latest materialization for each partition
    latest_partitions: dict[str, dict] = {}
    for partition, materializations in all_materializations.items():
        latest: AssetMaterialization = max(materializations, key=lambda x: x.timestamp)
        last_mat = (
            latest.event_log_entry.dagster_event.event_specific_data.materialization
        )
        metadata = last_mat.metadata
        latest_partitions[partition] = {
            "metadata": {k: v.value for k, v in metadata.items()},
            "tags": last_mat.tags,
        }

    return latest_partitions

# Workaround until this feature request is implemented: https://github.com/dagster-io/dagster/issues/20094
def get_upstream_metadata(context: InputContext) -> dict[str, Any]:
    metadata = get_metadata_per_partition(context.step_context.instance, context.upstream_output.asset_key)
    try:
        partition_key = context.step_context.asset_partition_key_for_input(context.upstream_output.name)
        metadata = metadata[partition_key]
    except DagsterInvariantViolationError:
        pass

    return metadata

Would be very interested in having a simpler way to do this. Additionally I'm trying to propagate some metadata to downstream assets which also seems quite difficult to do with IOManagers.

Our use case is that I would like users to set a license in the metadata of assets collecting external data and would then like to propagate this license information to the metadata of any downstream assets which are derived from the upstream dataset.