dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.14k stars 1.4k forks source link

Impossible to use partitioned `@asset` in partitioned `@op`'s `@job` #21130

Open louis-jaris opened 5 months ago

louis-jaris commented 5 months ago

Dagster version

dagster, version 1.7.0

What's the issue?

With dagster instance default configuration, it is impossible to consistently use @asset using static partitions (of size 1, 2, 3, or more) with @op and op's @job that are partitioned the same way (c.f. fully working example attached to reproduce).

It is impossible to have dagster properly inject the partitioned asset into the partioned job (dagster is injecting the merged asset, that is composed of {"partition_key_1": partition_value_1, "partition_key_2": partition_value_2, "partition_key_3: partition_value_3, ...}. And worse: if the partition is of size 1, then the issue disapear, and the system behave like if it was not partitioned (making the dagster type checking work, c.f. code and stack trace below).

What did you expect to happen?

I was expecting, as the documentation describes it (c.f. https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#loading-an-asset-as-an-input) to seamlessly integrate partitioned @asset with partitioned @job and @op.

In other words, it is impossible to have the following code to work (that is adapted from your documentation) :

@asset(
    partitions_def=partner_partition,
)
def emails_to_send() -> list[dict]: ...

# BUG: dagster._check.CheckError: Failure condition: Loading an input that corresponds to multiple partitions, but the type annotation on the op input is not a dict, Dict, Mapping, or Any: is 'typing.List[dict]'.
@op
def send_emails(emails: list[dict]) -> None: ...

@job(
    partitions_def=partner_partition,
)
def send_emails_job():
    send_emails(emails_to_send.to_source_asset())

How to reproduce?

Having a file called bug_definition.py containing the code below:

# Business definitions
import requests
from dagster import (
    Definitions,
    AssetExecutionContext, asset, StaticPartitionsDefinition, Output, define_asset_job, AssetSelection, schedule, DefaultScheduleStatus, RunRequest, job,
    OpExecutionContext, op, asset_sensor, DefaultSensorStatus, SensorEvaluationContext, EventLogEntry, SkipReason,
)

company_partition = StaticPartitionsDefinition(["COMPANY-1", "COMPANY-2", "COMPANY-3"])

MY_ASSET_GROUP = "my_pipeline_staying_in_sync"

DUMMY_STATIC_CONFIG = {
    "COMPANY-1": "123",
    "COMPANY-2": "456",
    "COMPANY-3": "789",
}

###############################################################################
# Extract and Transform flow

# XXXXXXXX
# BUG NOTE: please note the type `Output[list[dict]]`
@asset(
    partitions_def=company_partition,
    group_name=MY_ASSET_GROUP,
)
def marketing_info_for_customer_of_company(context: AssetExecutionContext) -> Output[list[dict]]:
    partition_key = context.partition_key
    # company name == partition_key
    context.log.info(f"Simple asset body for {partition_key=}")
    # The partition key in included in the result only to show that the payload depend on the company
    res = [{"company": partition_key, "index": 1, "email": "a@example.com"}, {"company": partition_key, "index": 2, "email": "b@example.com"}]
    return Output(res, metadata={"row_count": 2})

refresh_marketing_job_per_company = define_asset_job(
    "refresh_marketing_job_per_company",
    selection=AssetSelection.groups(MY_ASSET_GROUP),
    description="Extract and Transform flow preparing marketing data for each company",
    partitions_def=company_partition,
)

@schedule(
    job=refresh_marketing_job_per_company,
    cron_schedule="*/1 * * * *",  # debug: every minute
    default_status=DefaultScheduleStatus.RUNNING,
)
def marketing_preparation_schedule(context) -> list[RunRequest]:
    partition_keys = company_partition.get_partition_keys()
    return [
        RunRequest(
            partition_key=partition_key,
            run_key=f"{context.scheduled_execution_time}_{partition_key}",
        )
        for partition_key in partition_keys
    ]

###############################################################################
# Load flow

# XXXXXXXX
# BUG NOTE: please note the input type `list[dict]`
@op
def export_to_marketing_system(
        context: OpExecutionContext,
        contacts_to_export: list[dict],
        # Work around: all_contacts_to_export: dict[list[dict]],
) -> Output[dict]:
    # Work around: for company_name, contacts_to_export in contacts_to_export.items():
    #              It is only working is the partition size is strictly greater than 1
    company_name = context.partition_key
    context.log.info(f"Exporting {len(contacts_to_export)} contacts to marketing system for {company_name=}")
    try: 
        res = requests.post(
            "https://marketing.example.com/v1/contacts",
            json=contacts_to_export,
            params=DUMMY_STATIC_CONFIG[company_name],
            timeout=1,
        )
    except Exception as e:
        # Dummy error handling for the sake of the example
        context.log.info("Unable to contact the marketing system")
        return Output({"error": str(e)})
    return Output(dict(res.request.headers))

@job(partitions_def=company_partition)
def load_to_marketing_tool():
    """Export the owners with active pre-offer as SendGrid contacts"""
    partitioned_asset = marketing_info_for_customer_of_company.to_source_asset()
    # BUG NOTE: this is where the bug is showing up: the input being injected is
    # of type `dict[list[dict]]` instead of `list[dict]`
    # https://docs.dagster.io/concepts/ops-jobs-graphs/graphs#loading-an-asset-as-an-input
    export_to_marketing_system(partitioned_asset)

@asset_sensor(
    asset_key=marketing_info_for_customer_of_company.key,
    job=load_to_marketing_tool,
    default_status=DefaultSensorStatus.RUNNING,
)
def observe_for_changes_in_owners_for_marketing(
    context: SensorEvaluationContext, asset_event: EventLogEntry
):
    """Reacts to materialization of the selected asset, i.e. reacts to mutations to the selected asset"""
    assert asset_event.dagster_event and asset_event.dagster_event.asset_key
    if not asset_event.asset_materialization:
        return SkipReason(
            f"No mutation detected for asset_key={asset_event.dagster_event.asset_key}"
        )
    # TODO only trigger the partition that had their asset materialized (/mutated)
    for partition in marketing_info_for_customer_of_company.partitions_def.get_partition_keys():
        yield RunRequest(
                run_key=f"{context.cursor}|{partition}",
                partition_key=partition
            )

###############################################################################
# Definition for dagster

defs = Definitions(
    assets=[marketing_info_for_customer_of_company],
    jobs=[refresh_marketing_job_per_company, load_to_marketing_tool],
    schedules=[marketing_preparation_schedule],
    sensors=[observe_for_changes_in_owners_for_marketing],
)

This code contains comments around the bug, search for the string BUG NOTE and XXX.

And starting dagster like the following:

python3.11 -m venv .venv && source .venv/bin/activate && pip install --upgrade pip && pip install dagster
dagster dev -f bug_definition.py

Wait one or two minutes (the time the job kicks in). Error logs should appear in the terminal (and in the UI) with the following stack trace:

2024-04-10 19:35:40 +0200 - dagster - DEBUG - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - export_to_marketing_system - STEP_WORKER_STARTED - Executing step "export_to_marketing_system" in subprocess.
2024-04-10 19:35:40 +0200 - dagster - DEBUG - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - export_to_marketing_system - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2024-04-10 19:35:40 +0200 - dagster - DEBUG - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - export_to_marketing_system - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2024-04-10 19:35:40 +0200 - dagster - DEBUG - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - LOGS_CAPTURED - Started capturing logs in process (pid: 32674).
2024-04-10 19:35:40 +0200 - dagster - DEBUG - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - export_to_marketing_system - STEP_START - Started execution of step "export_to_marketing_system".
2024-04-10 19:35:40 +0200 - dagster - ERROR - load_to_marketing_tool - c00c2844-ef4c-4f60-9c42-ef5234968990 - 32674 - export_to_marketing_system - STEP_FAILURE - Execution of step "export_to_marketing_system" failed.

dagster._core.errors.DagsterExecutionLoadInputError: Error occurred while loading input "contacts_to_export" of step "export_to_marketing_system"::

dagster._check.CheckError: Failure condition: Loading an input that corresponds to multiple partitions, but the type annotation on the op input is not a dict, Dict, Mapping, or Any: is 'typing.List[dict]'.

Stack Trace:
  File "/Users/louis/DataPlatformGenericWorkflow/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/Users/louis/DataPlatformGenericWorkflow/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/inputs.py", line 846, in _load_input_with_input_manager
    value = input_manager.load_input(context)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/louis/DataPlatformGenericWorkflow/.venv/lib/python3.11/site-packages/dagster/_core/storage/upath_io_manager.py", line 420, in load_input
    check.failed(
  File "/Users/louis/jaris/DataPlatformGenericWorkflow/.venv/lib/python3.11/site-packages/dagster/_check/__init__.py", line 1606, in failed
    raise CheckError(f"Failure condition: {desc}")

Deployment type

None

Deployment details

Local development only. No workspace.yaml, nor dagster.yaml. Fresh install, no dependencies (c.f. how to reproduce). Using dagster dev only.

Additional information

I've been trying to reproduce this issue using unit test, but I wasn't able to properly tests @job (by passing a materialized partitioned asset), so this is why I can only present you with this script to run...

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

louis-jaris commented 5 months ago

(I am not sure, but this might be linked to https://github.com/dagster-io/dagster/issues/13357 that is still open, and that the documentation seems to indicate this is possible)

louis-jaris commented 4 months ago

This is still present, as of today, in 1.7.5