Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.26k stars 105 forks source link

Sub-second Timedelta? #89

Open ferventgeek opened 2 years ago

ferventgeek commented 2 years ago

Great project. I just switched an app over from Schedule.

One question, and I may be asking Rocketry to do something not in its nature. Is there an option to use Timedeltas lower than one second? ms and milliseconds are in the source and parsing them doesn't seem to explode Task assignment. However, they don't fire as often as expected.

In the example below I've fiddled with cycle_sleep down to 0.005 and None, but still only get 2-3 per second.

Is this supported? No problem if not, I'll have my tasks do it internally.

app = Rocketry(config={"cycle_sleep": None})

@app.task("every 100 ms")
def do_things():
    print(str(datetime.now()) + " hello")

if __name__ == "__main__":
    app.run()
2022-08-31 15:16:12.324561 hello
2022-08-31 15:16:12.713914 hello
2022-08-31 15:16:13.102267 hello
2022-08-31 15:16:13.488618 hello
2022-08-31 15:16:13.874968 hello
2022-08-31 15:16:14.261318 hello
2022-08-31 15:16:14.647670 hello
2022-08-31 15:16:15.035021 hello
2022-08-31 15:16:15.422372 hello
2022-08-31 15:16:15.812726 hello
2022-08-31 15:16:16.207084 hello
2022-08-31 15:16:16.599441 hello
2022-08-31 15:16:16.993798 hello
Miksus commented 2 years ago

Thanks! It's always nice to hear people like it!

This actually should be supported (considering the complexity of your project) and I have seen Rocketry is fast enough that I found a weird time traveller bug in core Python due to difference in datetime's and time's measurement of time.

But the reason why the above seems to be rather slow is due to that by default execution="process". I'm planning on having async as the default at some point as it seems it's not really intuitive it's launching tasks in separate processes by default.

I also made some tests with having other execution type;

from rocketry import Rocketry
from rocketry.conds import every, scheduler_running
import statistics

app = Rocketry(config={"cycle_sleep": None, "task_execution": "main", "shut_cond": scheduler_running(more_than="10 second")})

@app.task(every("10 ms"))
def do_things():
    ...

if __name__ == "__main__":
    app.run()
    recs = app.session.get_repo().filter_by(action="run").all()
    cycles = [val.created - prev_val.created for prev_val, val in zip(recs, recs[1:])]
    print(f"""
    Min: {min(cycles) * 1000} (ms)
    Mean: {statistics.mean(cycles) * 1000} (ms)
    Max: {max(cycles) * 1000} (ms)
    """)

This prints:

    Min: 12.33816146850586 (ms)
    Mean: 15.461485452327198 (ms)
    Max: 32.38940238952637 (ms)

If I change it to task_execution="process":

    Min: 625.8969306945801 (ms)
    Mean: 718.9911695627065 (ms)
    Max: 978.1100749969482 (ms)

If I change it to task_execution="thread":

    Min: 9.890079498291016 (ms)
    Mean: 10.700346826230687 (ms)
    Max: 25.794506072998047 (ms)

It's indeed a lot more as especially on Windows creating subprocesses is expensive. Also, complex scheduling conditions can decrease performance. Note also that the accuracy is dependent on OS. Interestingly, async does not work if cycle_sleep=None, need to investigate that.

I have been thinking of doing some proper load testing at some point and test the system with different loads but I think most of the design decisions are reasonable in terms of performance (and considering this is 100% Python). Do you think this was an answer you were looking for or did I go off-track?

Miksus commented 2 years ago

Replying to myself:

Interestingly, async does not work if cycle_sleep=None, need to investigate that.

Ye... of course. If that's None, the sleep is skipped altogether and it never releases the execution. That's actually a bug, probably should asyncio.sleep(0) to release the async execution.

ferventgeek commented 2 years ago

Wow thx for the quick re, @Miksus, and with a detailed test! And you're right it's isolation invocation cost.

@app.task("every 100 ms", execution="thread")

got

2022-08-31 17:00:46.048734 hello
2022-08-31 17:00:46.152246 hello
2022-08-31 17:00:46.255277 hello
2022-08-31 17:00:46.359004 hello
2022-08-31 17:00:46.463002 hello
2022-08-31 17:00:46.566998 hello
2022-08-31 17:00:46.670016 hello
2022-08-31 17:00:46.774996 hello
2022-08-31 17:00:46.879642 hello
2022-08-31 17:00:46.980733 hello

And using that approach I can decide if a process is important enough to invest in extra nerfing for the main process.

The only thing I miss is a random function, which schedule did do. I thought about adding an argument per invocation similar to this, for once every 500ms to 2 sec.

@app.task("every 500 ms", probability=0.25, execution="thread")
def do_things(prob: float):
    # do probable thing, probability 

but Arg and FuncArg seem to be global, so I'd need

app.params(prob_25=0.25)
app.params(prob_29=0.29)
app.params(prob_80=0.80)
# etc..

Thinking I might instead throw the task ID's in a dictionary with values for those that should be random, and then let the tasks look themselves up and check for a value for the instance. But I'll solve my odd edge case.

Thanks again. This is a great and well-tended repo. 👍

Miksus commented 2 years ago

Actually I thought of this today. This is actually something I tried to implement way back but it failed then due to the library was too immature. There are probably the prototypes in the git history somewhere.

Warning: the following is relatively low level stuff in the library.


But I think such randomness is sort of already supported but there could be built-in condition in the future for it. The problem is that how that is defined. If you say "run this with 50% chance" the question is when this evaluation takes place? One could argue it takes place all the time but then the task is run all the time as the condition is check every 10 ms or so. Anothe could argue it's tied with the other conditions of the task: if the task is set to run minutely, this random check is done for minute basis.

But there is a problem: the every condition is a floating period. There is no inherent start time or time to re-evaluate. The time 10 seconds ago is different than the time 10 seconds ago when one second has passed. This may sound silly but this actually complicates things. Illustration: if you say minutely, and the time is "2022-08-01 22:06:35" the next time this random check should be re-evaluated is at "2022-08-01 22:07:00". But if you say every 1 minutes and the time is the same, when is the next time this re-evaluation should take place? "2022-08-01 22:07:35"? The problem is that if one second passes, this re-evatualtion time goes one second to future. It's never reached with this philosophy.

One possible solution to fix this is that we have another definition of every. I think the current implementation should not be changed but we could have fixed_every that's timedelta but it's fixed to a specific time, for example starting the scheduler. This way we know when the re-evaluation takes place.

Third option would be to hook it with the evaluation of the task other conditions. However, the problem is that it means the condition has an inherent state (whether the condition is true or false depends on whether it was previously true or false). This probably is not the way to go.


End of the lower level stuff.

But I think the top level could look like something like this:

@app.task(minutely & randomly(0.20))
def do_things():
    ...

@app.task(every("1 hour", fixed=datetime.datetime.now()) & randomly(0.20))
def do_things():
    ...

Probably not the final but some ideas.

A quick working example

Back to less deep thoughts. If you used fixed time periods this could be done for example like this:

from datetime import timedelta
import datetime
from rocketry import Rocketry
from rocketry.args.builtin import Task
from rocketry.conds import scheduler_running, minutely
from rocketry.time import always
import statistics
import random as rnd
from rocketry.core.condition.base import All, BaseCondition

app = Rocketry(config={"cycle_sleep": 1.0, "task_execution": "main", "shut_cond": scheduler_running(more_than="5 minutes")})

def get_period(cond):
    period = always
    if isinstance(cond, All):
        for subcond in cond.subconditions:
            if hasattr(subcond, "get_cond"):
                period &= subcond.get_cond().period
    return period

class Random(BaseCondition):
    def __init__(self, prob):
        self.prob = prob
        self.prev_starts = {}

    def get_state(self, task=Task()):
        cond = get_period(task.start_cond)
        period_start = cond.rollback(datetime.datetime.now()).left
        prev_period_start = self.prev_starts.get(task)
        if prev_period_start != period_start:
            random_num = rnd.random()
            self.state = random_num <= self.prob
            self.prev_starts[task] = period_start
            print(f"Updating: {random_num} <= {self.prob} --> {self.state}")
        return self.state

@app.task(minutely & Random(0.5))
async def do_things():
    print("Doing", datetime.datetime.now())

if __name__ == "__main__":
    app.run()

This requires quite a lot of explanation of the time period mechanism and how conditions work under the hood but you can tinker with that example if you wish. It seems indeed run the task with 50% probability for every full minute. Also, the example is incomplete and only supports very specific conditions.

What do you think? Probably most of the above is hard to gasp on but in short, I think the condition system should be able to solve the the problem. It's pretty versatile.

Miksus commented 2 years ago

Actually, maybe the random condition could work like this:

@app.task(randomly(0.2, period=minutely))
def do_things():
    ...

In this case, it's with 20% chance true every minute. Note that minutely is a time period and not a condition in this example.

Illustration how this would work:

In this case the every (or timespan) could be fixed to the first check of the condition. And perhaps if the period is not given, it's determined from the task so we could support both ways.

ferventgeek commented 2 years ago

Thanks so much. The first option is great in that it's workable without code changes. But the 2nd option is determinate at the time of execution assessment- which feels much cleaner. We are in minute X, do we fire? No. Ok see you in min/hour/day next time. It avoids the need to break timespans into artificial sub-intervals of probability. It's also much cleaner from a debug log perspective because you're logging one 10% binary result in an existing hour every, instead of ten 6 minute segments.

Great idea!

One other question, and I'm sure I'm missing it. I'd still like to be able to hand tasks initialization values, or at least a non-global object reference. I'm looking at SimpleArg, ex:

@app.task("every 300 ms", execution="thread", item = SimpleArg("Foo"))

But in https://github.com/Miksus/rocketry/blob/b837b8bc8a800e3e97ac6a638b14d89fe11b53ce/rocketry/args/__init__.py it's not pulled in.

from .builtin import Arg, FuncArg, Return, Session, Task, TerminationFlag
from .secret import Private

And in https://github.com/Miksus/rocketry/blob/8fa4aea94b6beb70abe9d99f63bd57fb34390e00/docs/tutorial/intermediate.rst there's no example about how to reference item.

Was that an old version of the codebase? Am I even looking at the right function? Again the goal is to create a task and hand it private initialization data without the need to assign a uniquely named app.params entry for each Task to differentiate the value. Instead, you'd set the value in the Task definition and then access it in the task code. I'm assuming via Task() or Session().

I'm happy to update args.int to pull it in and update the example for the world's smallest PR.

Again, amazing work here.

ferventgeek commented 2 years ago

Actually, I think I'm doing this wrong..

from rocketry.args.builtin import SimpleArg

Got it imported.

But what I'm hoping to do is assign a value to the underlying method in the Task create. It would then be available (and ideally persistent) to that instance of the task.

from rocketry.args.builtin import SimpleArg

@app.task(item = SimpleArg(unicorn_magic)
def do_things(item: SimpleArg = None):
    print(item)
    item++
    # etc..

or

def do_things(this_task=Task()):
    print(this_task.item) ## this_task["item"], this_task.simple_args["item" ...
    this_task.item++
    # etc..

app.task('daily', func=do_things, item=SimpleArg(unicorn_magic))

Does that make sense? Basically a types.SimpleNamespace-ish accessible persistent storage container. Instance-level counters, control properties, behavior modifiers, etc. It would allow reuse of do_things() in multiple tasks where do_things() function is controlled by instance properties.

Is there an example of how to reference index? It's not in https://rocketry.readthedocs.io/en/stable/tutorial/intermediate.html#parameterizing. I fiddled with task.get_extra_params() but couldn't crack it. Thx @Miksus

Miksus commented 2 years ago

Thanks for the idea! I think the second option can be first implemented and then thought whether the first is useful and not too confusing.

However, not sure whether this:

@app.task(randomly(0.2, period=minutely))
def do_things():
    ...

should be true over the period if its state ended up to be true or be bounded with the task run (like daily checks the task has not run today). The former means that the condition could be easily and arbitrarily combined with anything but the latter is easier to use.


About this:

One other question, and I'm sure I'm missing it. I'd still like to be able to hand tasks initialization values, or at least a non-global object reference. I'm looking at SimpleArg, ex:

I think we could import SimpleArg. The reason is that it's mostly used in testing as I was not entirely sure how useful it is usually. In most cases, you should be able to pass the value itself as-is. It's actually just an argument wrapper for any Python object.

The answer to your question is that there is an argument parameters in the task init so this should work:

@app.task("every 300 ms", execution="thread", parameters={"item": SimpleArg("Foo")})
def do_things(item):
    ...

Or alternatively just:

@app.task("every 300 ms", execution="thread", parameters={"item": "Foo"})
def do_things(item):
    ...

In these cases the argument item will have the value Foo. You can also set that later after the initiation if you wish so (in another task using the Task argument or Session).


And I think I understand what you are after: sort of creating states in the task. At least you could pass a mutable object as the parameter and modify that if you don't use execution="process". Alternatively, if the task itself represents sort of a state (like fetch data) you could just use rocketry.args.Return.

Maybe there could be room for this sort of an argument. Maybe a Namespace which materializes to a simple namespace object.


Also as you seem to be interested in the args/parameter mechanics more. Those are quite undocumented as well (as the library is so vast that it's a full time job to write documentations) but I inherently it was simple: just pass this dict as parameters to this task (function). But then multiprocessing complicated the issue and I needed a way to "materialize" the arguments. Not everything is pickled so I needed a way to "stage" the value of a parameter to make it in a form that can be pickled and then when the process has started, the argument is "materialized" (turned to the actual value). This is relevant for some use cases:


There are also some aspects that I'm not sure whether they fit the current idea of the library. Currently, all of the session parameters are passed to tasks that have their names in the argument signature and not sure if that's maybe too flexible. I can show this later what I mean.

But it seems docs are indeed slightly off with the SimpleArg. Thanks for spotting!