Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.25k stars 105 forks source link

ENH: Task Multilaunch #126

Closed Miksus closed 1 year ago

Miksus commented 1 year ago

This PR adds multilaunch:

@app.task(multilaunch=True, execution="process")
def do_things():
    ...

The above task can be run even if it was already running thus enabling a task to run parallel against itself. If multilaunch=False, the task won't be launched if it is already running.

codecov-commenter commented 1 year ago

Codecov Report

Base: 94.72% // Head: 94.90% // Increases project coverage by +0.17% :tada:

Coverage data is based on head (4cb5132) compared to base (6ee2d03). Patch coverage: 97.58% of modified lines in pull request are covered.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #126 +/- ## ========================================== + Coverage 94.72% 94.90% +0.17% ========================================== Files 79 80 +1 Lines 4323 4453 +130 ========================================== + Hits 4095 4226 +131 + Misses 228 227 -1 ``` | [Impacted Files](https://codecov.io/gh/Miksus/rocketry/pull/126?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli) | Coverage Δ | | |---|---|---| | [rocketry/core/task.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvY29yZS90YXNrLnB5) | `91.41% <96.55%> (+0.91%)` | :arrow_up: | | [rocketry/conditions/api.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvY29uZGl0aW9ucy9hcGkucHk=) | `98.36% <97.36%> (-0.31%)` | :arrow_down: | | [rocketry/args/builtin.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvYXJncy9idWlsdGluLnB5) | `98.50% <100.00%> (+0.02%)` | :arrow_up: | | [rocketry/conditions/task/task.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvY29uZGl0aW9ucy90YXNrL3Rhc2sucHk=) | `93.87% <100.00%> (+0.93%)` | :arrow_up: | | [rocketry/core/schedule.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvY29yZS9zY2hlZHVsZS5weQ==) | `93.50% <100.00%> (+0.57%)` | :arrow_up: | | [rocketry/log/log\_record.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvbG9nL2xvZ19yZWNvcmQucHk=) | `100.00% <100.00%> (ø)` | | | [rocketry/session.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvc2Vzc2lvbi5weQ==) | `95.71% <100.00%> (+0.04%)` | :arrow_up: | | [rocketry/tasks/run\_id.py](https://codecov.io/gh/Miksus/rocketry/pull/126/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli#diff-cm9ja2V0cnkvdGFza3MvcnVuX2lkLnB5) | `100.00% <100.00%> (ø)` | | Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli). Have a feature suggestion? [Share it here.](https://app.codecov.io/gh/feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=Mikael+Koli)

:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

goznauk commented 1 year ago

This is what i've been looking for! I've almost started writing issue for this problem. Thank you for wonderful Library :)

BTW Do you have plan for timeout feature or fail-if-same-task-called feature?

just like...

======= ========= ======= created task_name action ======= ========= ======= ----- time to call new task ----- 1 do_things A run 2 do_things A success ----- time to call new task ----- 3 do_things B run ----- time to call new task ----- 4 do_things B failed 5 do_things C run 6 do_things C success ======= ========= =======

Miksus commented 1 year ago

Thanks, and thanks a lot! :) This is sort of a passion project of mine (like it wasn't obvious) thus it's really nice to see how others like it as well.

I'm planning on having some sort of tracking on the runs in the logs. I'll see if it's easy enough to include it in this PR but if not but I'll do it later, possibly before the next update. It seems the runs are not needed to be tracked in the logs for the system to work correctly which is the reason it's not strictly necessary (though very useful).

I implemented the timeout in a way that it should work based on individual runs. So if you specify a timeout of 30 mins and run a task twice, each of those can run 30 mins instead of cutting all of them when the first run has run 30 mins. There is also a possibility to add further conditions based on the runs so we could make conditions that, for example, allow only 5 runs of the same task.

Furhermore, this PR will be sort of the foundation for the event-driven paradigm I'm close to finish. I think that will also be awesome:

@app.event()
def new_item():
    ...
    return datetime.datetime(2022, 10, 12)

@app.task(new_item)
def do_things():
    ...

This task runs when it hasn't run after the last event occurred (the time in the event function). The event function can return a datetime (time when the event occurred last time) or an Event object that has time and value. The value can be used as an argument:

@app.event()
def new_item():
    ...
    return Event(datetime=datetime.datetime(2022, 10, 12), value="a value")

@app.task(new_item)
def do_things(event_value=new_item):
    ...

Went a bit of off-topic as always...

And thank you for the idea!

Miksus commented 1 year ago

By the way, I still decided to include the run ID logging to this PR as well.

Miksus commented 1 year ago

This will also support custom run IDs:

import json
def generate_run_id(task, params=None):
    return json.dumps(dict(params), default=str)

@app.task(multilaunch=True, func_run_id=generate_run_id)
def do_things(report_date):
    ...
created task_name run_id action
1 do_things {"report_date": "2022-01-01"} run
2 do_things {"report_date": "2022-01-02"} run
3 do_things {"report_date": "2022-01-01"} success
4 do_things {"report_date": "2022-01-02"} success

Quite nice if you regularly need to manually run things.

Miksus commented 1 year ago

Also rocketry.conds.running has been refactored to support multilaunch.


Allow only 4 parallel runs:

@app.task(running <= 4, multilaunch=True)
def do_things():
    ...

Allow running only if another task is running at most 4 concurrent runs:

@app.task(running("another_task") <= 4)
def do_things():
    ...

Also this now works (allows only one parallel run):

@app.task(~running)
def do_things():
    ...

And more_than and less_than are slightly updated:

@app.task(end_cond=running.more_than("10 minutes"))
def do_things():
    ...

@app.task(running(do_things).less_than("10 minutes"))
def do_another():
    ...

@app.task(running(do_things).between("10 minutes", "15 minutes"))
def do_else():
    ...