dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.14k stars 1.4k forks source link

Dagster meta in `defaults` arg of `replication.yaml` is ignored #21075

Closed edsoncezar16 closed 4 months ago

edsoncezar16 commented 5 months ago

Dagster version

dagster, version 1.7.0

What's the issue?

The function get_streams_from_replication ignores the arguments passed to the defaults block of the replication.yaml, as can be seen from its definition in the following code snippet:

def get_streams_from_replication(
    replication_config: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
    """Returns a list of streams and their configs from a Sling replication config."""
    for stream, config in replication_config.get("streams", {}).items():
        if config and config.get("disabled", False):
            continue
        yield {"name": stream, "config": config}

As a consequence, one cannot, for instance, configure a default asset group in the defaults block, which has proven to be a quite common requirement in our context, as in the following replication.yaml:

source: MY_S3
target: MY_SNOWFLAKE

defaults:
  mode: incremental
  primary_key:
    - EntityID
  source_options:
    format: csv
  meta:
    dagster:
      group: my_group

streams:
  data/subject_1_*.csv:
    object: MY_SCHEMA.SUBJECT1
  data/subject_2_*.csv:
    object: MY_SCHEMA.SUBJECT2

env:
  SLING_LOADED_AT_COLUMN: true
  SLING_STREAM_URL_COLUMN: true

This results in all assets being assigned to the default group, as the group name is not being set.

We understand that the meta at the stream level will get what we want. Nevertheless, such a workflow would be quite unintuitive and unfortunate, as one has no reason to think, in principle, that a so-called default configuration would not take effect, besides enforcing the repetitive step of adding the same configuration on a per-asset basis.

What did you expect to happen?

With the above configuration, all assets generated by the replication.yaml should belong to the my_group asset group.

How to reproduce?

Deployment type

Other Docker-based deployment

Deployment details

No response

Additional information

Our current workaround involves using the dagster utility function deep_merge_dicts and adding a convenience method in the CustomDagsterSlingTranslator as follows:

class CustomDagsterSlingTranslator(DagsterSlingTranslator):
    _replication_config: SlingReplicationParam

    @property
    def replication_config(self) -> SlingReplicationParam:
        return self._replication_config

    def get_rectified_stream_config(
        self, stream_definition: Mapping[str, Any]
    ) -> Mapping[str, Any]:
        """
        Returns a stream_config where configurations in the 'defaults' block of the
        'replication.yaml' are not ignored.
        """
        default_config = validate_replication(self.replication_config).get(
            "defaults", {}
        )
        stream_config = stream_definition.get("config", {})
        return deep_merge_dicts(default_config, stream_config)

    def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
        config = self.get_rectified_stream_config(stream_definition)
        ...

    ...

However, a much simpler solution would amount to performing the merge operation with the default arguments directly in the get_streams_from_replication, for instance:

def get_streams_from_replication(
    replication_config: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
    """Returns a list of streams and their configs from a Sling replication config."""
    default_config = replication_config.get("defaults", {})
    for stream, stream_config in replication_config.get("streams", {}).items():
        config = deep_merge_dicts(default_config, stream_config)
        if config and config.get("disabled", False):
            continue
        yield {"name": stream, "config": config}

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

edsoncezar16 commented 4 months ago

Fixed by PR #21390 .