dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.41k stars 1.44k forks source link

Provide config values to assets that are triggered by auto-materialize policies #12955

Open christeefy opened 1 year ago

christeefy commented 1 year ago

What's the use case?

I understand that it’s possible to parametrize assets using config_schemas. However, I’m not quite sure how I can supply these config values to assets in a “reactive” setting (i.e. triggered by reconciliation sensors).

Here's a minimal code example:

from dagster import (
    AssetSelection,
    Definitions,
    asset,
    build_asset_reconciliation_sensor,
)

@asset
def a():
    ...

@asset(config_schema={"input_b": bool})
def b(a):
    ...

# Make `asset b` automatically materialize after `asset a`
update_sensor = build_asset_reconciliation_sensor(
    name="update_sensor", asset_selection=AssetSelection.all()
)

defs = Definitions(assets=[a, b], sensors=[update_sensor])

Without a way to specify config values for b, the SensorDaemon raises an error whenever the reconciliation sensor is activated / triggered in dagster dev:

dagster.daemon.SensorDaemon - ERROR -  Sensor daemon caught an error for sensor update_sensor
Traceback (most recent call last):
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 489, in _process_tick_generator
    yield from _evaluate_sensor(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 653, in _evaluate_sensor
    run = _get_or_create_sensor_run(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 780, in _get_or_create_sensor_run
    return _create_sensor_run(
  File "../python3.10/site-packages/dagster/_daemon/sensor.py", line 815, in _create_sensor_run
    external_execution_plan = repo_location.get_external_execution_plan(
  File "../python3.10/site-packages/dagster/_core/host_representation/repository_location.py", line 750, in get_external_execution_plan
    execution_plan_snapshot_or_error = sync_get_external_execution_plan_grpc(
  File "../python3.10/site-packages/dagster/_api/snapshot_execution_plan.py", line 65, in sync_get_external_execution_plan_grpc
    raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.DagsterInvalidConfigError: Error in config for job
    Error 1: Missing required config entry "ops" at the root. Sample config for missing entry: {'ops': {'b': {'config': {'input_b': True}}}}

From some discussions with the Dagster slack community, I'm guessing that there's no way to supply config values currently to such assets.

Ideas of implementation

Based on my limited understanding of the codebase internals, we normally would provide config values to a RunRequest(run_config=...) in a @sensor.

Since the reconciliation sensor is initialized using build_asset_reconciliation_sensor, we may be able to extend that builder with a run_config/config argument:

@experimental
def build_asset_reconciliation_sensor(
    ...
    run_tags: Optional[Mapping[str, str]] = None,
+   run_config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig[object]"]] = None,
) -> SensorDefinition:
    ...

    def _sensor(context):
        ...
        run_requests, updated_cursor = reconcile(
            repository_def=context.repository_def,
            asset_selection=asset_selection,
            instance=context.instance,
            cursor=cursor,
            run_tags=run_tags,
+           run_config=run_config,
        )
    ...

def reconcile(
    ...
    run_tags: Optional[Mapping[str, str]],
+   run_config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig[object]"]] = None,
):
    ...

    run_requests = build_run_requests(
        asset_partitions_to_reconcile | asset_partitions_to_reconcile_for_freshness,
        asset_graph,
        run_tags,
+       run_config
    )

    ...

def build_run_requests(
    ...
    run_tags: Optional[Mapping[str, str]],
+   run_config: Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig[object]"]] = None,
) -> Sequence[RunRequest]:
    ...

    run_requests = []

    for (
        partitions_def,
        partition_key,
    ), asset_keys in assets_to_reconcile_by_partitions_def_partition_key.items():
        ...

+       # Add logic to select  subsets of `run_config` by `asset_keys`

        run_requests.append(
            RunRequest(
                asset_selection=list(asset_keys),
                tags=tags,
+               run_config=config_by_asset_keys
            )
        )

    return run_requests

Additional information

Looking forward to hear feedback on this proposal. If it's not too complicated to implement, I'd like to take a stab at this with some guidance of course 😊

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 1 year ago

Hi @christeefy - do you only want this config to be applied when the run is triggered from the reconciliation sensor? Or would you want it to be applied any time the asset is materialized (e.g. from the UI)?

If the former, I'm curious why?