dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
12.02k stars 1.51k forks source link

run_status_sensor behind runs - consider loosening db query limit #14778

Open dagsir[bot] opened 1 year ago

dagsir[bot] commented 1 year ago

Issue from the Dagster Slack

This issue was generated from the slack conversation at: https://dagster.slack.com/archives/C01U954MEER/p1686675793467069?thread_ts=1686675793.467069&cid=C01U954MEER


Conversation excerpt

U01RXC3HL9Z: Hi team, the run_status_sensor seems to behave quite randomly.

  1. We have 4 sensors on QUEUED, STARTED, SUCCESS, and FAILURE. The sensors seem to miss lots of events.
  2. The logged time on the sensor is totally off.. U01RXC3HL9Z: We have more than 700 jobs within the repo successfully run in past 12h. The @run_status_sensor(run_status=DagsterRunStatus.SUCCESS) caught like 50 U01RXC3HL9Z: For the ones that it caught, the logged time is off. For example, a run was queued at 6am, started at 7:40am, and completed at 7:58am.

However here is what shows in Dagit. U01RXC3HL9Z: The log drives me further crazy.. on 4:26am, I got a log for my success sensor, and on 9:15am, I got the log for started sensor…

[32m2023-06-13 11:26:22 +0000[0m - dagster.daemon.SensorDaemon - [34mINFO[0m - Completed a reaction request for run 1ce02361-a4f3-4757-a985-1b77b2174e78: Sensor "fab_run_success" acted on run status SUCCESS of run 1ce02361-a4f3-4757-a985-1b77b2174e78. [32m2023-06-13 16:15:30 +0000[0m - dagster.daemon.SensorDaemon - [34mINFO[0m - Completed a reaction request for run 1ce02361-a4f3-4757-a985-1b77b2174e78: Sensor "fab_run_started" acted on run status STARTED of run 1ce02361-a4f3-4757-a985-1b77b2174e78. U01RXC3HL9Z: Example for the missed runs. This job “fabricator_doubledash_store_ranking_sampled_training_instance_v2” succeeded around 3:23am, but it didn’t have any run_status_sensor trigger..

I can see that the run_status_sensor for “succeed” ran on 3:22 and 3:39. It just never caught the run on 3:23am. U01RXC3HL9Z: Another weird example is that the sensor for QUEUED and SUCCESS caught a run..but the STARTED sensor missed it. U01RXC3HL9Z: cc <@U016C4E5CP8>..it looks like the run_status_sensor have some really serious problems..wondering if anyone is aware of this from Dagster side? U01RXC3HL9Z: I am speculating if https://github.com/dagster-io/dagster/blob/4faf5ea4438434a1953595f09f96313d7e803326/python_modules/dagster/dagster/_core/definitions/run_status_sensor_definition.py#L632|this is the problem.

The run_status_sensor fetches the oldest 5 records after the current cursor’s storage_id and update_timestamp. For each record, it gets the run, triggers the run_status_sensor_fn and other logic, and then update the cursor to the event_record’s storage_id and update_timestamp.

Some questions:

  1. Is it possible that event A could have a smaller storage_id but more recent update_timestamp? (i.e. somehow an event was created and then updated afterwards)
  2. does get_event_records(EventRecordsFilter(), ascending=True, limit=5) returns the oldest even that satisfy the filter?
  3. I am curious why the limit=5 UULA0R2LV: hi Hebo! yeah i think limit=5 is the problem here. it was kinda set arbitrarily when we built it. this was also going to be my follow up after our <https://dagster.slack.com/archives/C01U954MEER/p1686087923318699|last convo> U01RXC3HL9Z: Thanks Yuhan! Yeah, we run this sensor at 15m interval so 5 per limit will cause a big backlog. However, I don’t fully understand why it’d skip some runs.. UULA0R2LV: <@U018K0G2Y85> issue “run_status_sensor missing runs - consider loosening db query limit”

Message from the maintainers:

Do you care about this too? Give it a :thumbsup:. We factor engagement into prioritization.

opluskal commented 1 year ago

I have a general question regarding the RunStatusSensor. Would it be possible instead of

context.instance.get_event_records(
EventRecordsFilter(
    after_cursor=RunShardedEventsCursor(
        id=record_id,
        run_updated_after=cast(datetime, pendulum.parse(update_timestamp)),
    ),
    event_type=event_type,
),
ascending=True,
limit=5,
)

use context.instance.get_run_records()? This way the sensor would be using data from runs table and it could be configured on RunsFilter to query only relevant jobs. I think the logic afterwards could be polished and made much simpler. Am I missing some signifficant catch of this approach?