dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.69k stars 1.48k forks source link

run_status_sensor target/request_job not working as expected #25813

Open Kat-Alo opened 5 days ago

Kat-Alo commented 5 days ago

What's the issue?

I need a sensor to monitor job_a, which runs on a schedule. When job_a succeeds, I need the sensor to update the dynamic partitions used for job_b and then submit run requests for all partitions (i.e. not just new ones). The code that I would expect to accomplish this looks roughly like:

from dagster import (
    ...
)

from project.jobs import job_a, job_b

partitions_def = DynamicPartitionsDefinition(name="records")

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    default_status=DefaultSensorStatus.RUNNING,
    monitored_jobs=[job_a],
    request_job=job_b,
)
def records_success_callback(context: RunStatusSensorContext) -> None:
    """
    Sensor triggered when job_a succeeds
    """
    all_partitions, partitions_to_delete, partitions_to_add = util_that_gets_partitions(context)
    return SensorResult(
        run_requests=[
            RunRequest(partition_key=partition_key) for partition_key in all_partitions
        ],
        dynamic_partitions_requests=[
            page_partitions_def.build_add_request(partition_keys_to_add),
            page_partitions_def.build_delete_request(partition_keys_to_delete),
        ],
    )

This code results in an error: dagster._core.errors.DagsterCodeLocationLoadError: Failure loading src.project: dagster._core.errors.DagsterInvalidDefinitionError: Duplicate definition found for unresolved job 'job_b'

It appears that importing job_b and passing it as the request_job value results in Dagster trying to define the job twice.

I then tried to instead define the target job in the RunRequest itself, rather than in the @run_status_sensor decorator. This looked like:

from dagster import (
    ...
)

from project.jobs import job_a

partitions_def = DynamicPartitionsDefinition(name="records")

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    default_status=DefaultSensorStatus.RUNNING,
    monitored_jobs=[job_a],
)
def records_success_callback(context: RunStatusSensorContext) -> None:
    """
    Sensor triggered when job_a succeeds
    """
    all_partitions, partitions_to_delete, partitions_to_add = util_that_gets_partitions(context)
    return SensorResult(
        run_requests=[
            RunRequest(partition_key=partition_key, job_name="job_b") for partition_key in all_partitions
        ],
        dynamic_partitions_requests=[
            page_partitions_def.build_add_request(partition_keys_to_add),
            page_partitions_def.build_delete_request(partition_keys_to_delete),
        ],
    )

But that gave me this error: Error in sensor records_success_callback: Sensor evaluation function returned a RunRequest for a sensor lacking a specified target (job_name, job, or jobs). Targets can be specified by providing job, jobs, or job_name to the @sensor decorator.

So then, I tried defining job or job_name in the @run_status_sensor and got an unexpected key error. After checking the docs, I figured the error message was more specifically for the @sensor decorator and that I should try returning to request_job instead.

As a gut check, I pulled the definition of job_b into the sensor.py file like so:

from dagster import (
    ...
)

from project.jobs import job_a

partitions_def = DynamicPartitionsDefinition(name="records")

job_b_assets = AssetSelection.assets("asset_a", "asset_b", "asset_c")
job_b = define_asset_job(name="job_b", selection=job_b_assets)

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    default_status=DefaultSensorStatus.RUNNING,
    monitored_jobs=[job_a],
    request_job=job_b,
)
def records_success_callback(context: RunStatusSensorContext) -> None:
    """
    Sensor triggered when job_a succeeds
    """
    all_partitions, partitions_to_delete, partitions_to_add = util_that_gets_partitions(context)
    return SensorResult(
        run_requests=[
            RunRequest(partition_key=partition_key) for partition_key in all_partitions
        ],
        dynamic_partitions_requests=[
            page_partitions_def.build_add_request(partition_keys_to_add),
            page_partitions_def.build_delete_request(partition_keys_to_delete),
        ],
    )

and (very much to my surprise) I no longer had the Duplicate definition error. However, this time, the sensor started getting activated every 30 seconds โ€” as in, every 30 seconds, without job_a running, run requests for all partitions on job_b were getting submitted.

I went to bed, woke up this morning, and thought, "what the heck, let's try using request_jobs=[job_b] in the sensor decorator, instead of request_job=job_b". And somehow, that seems to be working. ๐ŸŽ‰

I'm still not entirely sure what the heart of this issue is (I'm not even really sure that I'm not just doing something wrong), but I think the problems are:

If you've read this far, tysm for your time and consideration ๐Ÿ™

What did you expect to happen?

No response

How to reproduce?

No response

Dagster version

1.9.0

Deployment type

None

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a ๐Ÿ‘! We factor engagement into prioritization. By submitting this issue, you agree to follow Dagster's Code of Conduct.

leejlFG commented 1 day ago

Using version 1.8.1, I've also struggled with these details. I have run_status_sensors that are identical in structure to your first example, and they run just fine without providing a duplicate job definition. I also have struggled with a similar run_status_sensor that launches two distinct jobs with the successful completion of another, and after receiving the same error as your second example (sensor lacking a specified target) with various permutations of job name input locations, eventually I "fixed" it by resetting the sensor and its cursor in the UI.

Not a true "fix," and I never got to the bottom of it, but at least it works. Have you included resetting the sensor and its cursor in your different attempts?

This type of sensor is extremely useful and I believe we would all benefit from better documentation and performance.