dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.69k stars 1.48k forks source link

non-cron schedules based on arbitrary iterators of execution times #9501

Open erinov1 opened 2 years ago

erinov1 commented 2 years ago

What's the use case?

Currently the only publicly supported method for scheduling is based on cron strings. This does not allow one to schedule runs at times of day with different hours/minutes, for example 9:30 and 10:15 every weekday, or to run a "cleanup" job during downtime on a list of business holidays. This can be achieved with sensors (or using two different schedules in the former example), but this isn't conceptually satisfying since the execution times are known in advance.

On the other hand, from what I can tell, the scheduler internals only uses croniter to produce an iterable of datetimes. Would it be possible to expose a public-facing option to schedule using an arbitrary iterator of datetimes, using croniter as a special (default) case?

This could also be useful in order to incorporate logic to automatically stop the schedule after a certain execution time by using a finite iterator (for example if the job is processing data for a product that will only exist until a certain known point in time), without having to manually deactivate the schedule.

Ideas of implementation

I'm not sure if there are issues surrounding serializability of the iterator, but I wonder if this as easy as changing the ScheduleDefinition interface to optionally accept an iterator instead of a cron string and modifying

https://github.com/dagster-io/dagster/blob/e1adadd33cecf01474ec1c232b5b52b68ab1210a/python_modules/dagster/dagster/_core/host_representation/external.py#L563-L566

to read in the schedule's datetime iterator?

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

sryza commented 2 years ago

I think this would be cool. One difficulty we'd need to figure out how to navigate with implementing this is that the code listed above runs inside the daemon host process, and user code (i.e. code defined in a Dagster repository) shouldn't run there (because of security issues and how code gets deployed).

erinov1 commented 2 years ago

@sryza thanks!

Airflow 2.2+ allows for arbitrarily complex schedules via Timetables (see also this in-depth how-to). I think it would be great for dagster to have a similar feature set.

As a first step (that would elide issues surrounding custom user code running inside the daemon), what about adding functionality to handle a list of lists of cron strings in "OR of ANDS" form? Given two schedules A, B, the next execution time for A OR B would be the earliest valid time after the current execution for either A or B. The next execution time for A AND B is the earliest valid time after the current execution time for both A and B.

An input of cron strings like

[[str_1, str_2, str_3], [str_4, str_5]]

would be scheduled according to (str_1 AND str_2 AND str_3) OR (str_4 AND str_5), etc. The APScheduler library supports these sorts of schedules via OrTrigger/AndTrigger. Apart from deciding what to do when an intersection is empty, this doesn't sound too difficult to implement in https://github.com/dagster-io/dagster/blob/728ef18c5f1d8da21da7c9a4d933422358011227/python_modules/dagster/dagster/_utils/schedules.py#L20-L22 and then allow ScheduleDefinition's cron_schedule to accept a list of lists.

(I am happy to take a stab at it if this seems simple enough to implement).

erinov1 commented 2 years ago

Actually, while I think that forming new schedules from intersections of predefined ones might be useful from a code-recycling point of view, this may not be so useful at the level of actual cron strings. Just being able to form a schedule from the union of a list of cron strings (without support for intersections) is probably enough. This is simpler since one doesn't have to worry about empty intersections, and the interface is simpler too.

sryza commented 2 years ago

@erinov1 - interesting idea. Do you have an example in mind if a cron union that you'd want to use?

erinov1 commented 2 years ago

Yes, I have two examples, neither of which can be encapsulated into a single cron string as far as I know:

  1. Run a job every 15 minutes from 8:30AM - 11:45AM and 1:30PM - 5:00PM M-F, and then once at 5:00PM on Sunday.
  2. Run a job every 30 minutes from 5:00PM on Sunday to 5:00PM on Friday.

    There are many ways to set this up with the existing scheduler (basically issuing many SkipRequests), but it is a bit distracting to log/display extraneous ticks, especially when operating on a tight 15 minute cadence. Perhaps another approach is to allow filtering of certain ticks before ever making it to the ScheduleEvaluationContext

sryza commented 2 years ago

How would you express "Run a job every 30 minutes from 5:00PM on Sunday to 5:00PM on Friday." with a cron expression union?

erinov1 commented 2 years ago

It's hideous, but the union of

[
    "*/30 17-23 * * SUN", # Sunday 17:00, ..., 23:30 
    "*/30 * * * MON-THU", # Monday-Thursday 00:00, ..., 23:30
    "*/30 0-16 * * FRI", # Friday 00:00, ..., 16:30
    "0 17 * * FRI", # Friday at 17:00
]
sryza commented 2 years ago

Got it - that's not so bad. Yes, I think this would be cool to add - if you'd be up for implementing it, I'd be happy to review.

erinov1 commented 2 years ago

Thanks, I'll give it a shot!