Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.23k stars 105 forks source link

BUG - second manual process run freezes the async app.serve() #167

Open egisxxegis opened 1 year ago

egisxxegis commented 1 year ago

Describe the bug When Rocketry works in async fashion through app.serve(), it becomes frozen when specific manual-run task sequence is executed. Task sequence: run_me_madam starts on async run_me_madam finishes run_me_sir starts on process (not async) app.serve() becomes frozen

To Reproduce For reproducing the bug I will paste code at the bottom of issue and will pinpoint commit, after which the aforementioned bug arose.

Expected behavior starting manually a task after one has finished, should not freeze the async Rocketry.serve(). In this case it is expected to see run_me_sir finish.

Screenshots When trying to KeyboardInterrupt the frozen async Rocketry.serve(), such error always appears: image

Desktop (please complete the following information):

Additional context Here I paste the code.

from time import sleep
import asyncio

from rocketry import Rocketry

# #
# Rocketry app
app = Rocketry(
    config={
        "execution": "async",
        "max_process_count": 100,
    }
)

# #
# tasks
@app.task("every 10 seconds")
async def mark_10_seconds():
    print("10 seconds have passed")

@app.task(execution="async")
async def run_me_madam():
    print("Madam, I am going to sleep for 2 seconds")
    sleep(2)
    print("Madam, I am done with sleeping")

@app.task(execution="process")
async def run_me_sir():
    print(f"Sir, I am going to sleep for 2 seconds.")
    sleep(2)
    print("Sir, I am done with sleeping")

if __name__ == "__main__":

    async def task_manual_run_flow():
        await asyncio.sleep(3)
        print(
            "will run run_me_madam task - on async (or even if on process, result would be the same)"
        )
        app.session["run_me_madam"].run()
        await asyncio.sleep(10)
        print("madam task should be done by now")
        print("will run run_me_sir - on process")
        app.session["run_me_sir"].run()
        print("doing await asyncio.sleep(5). If it takes forever, you are stuck here.")
        await asyncio.sleep(5)
        print(
            "launched run me sir through session. If you do not see this message, it means that app.serve() never returned control via await."
        )

    async def main_main():
        return await asyncio.gather(app.serve(), task_manual_run_flow())

    asyncio.run(main_main())
egisxxegis commented 1 year ago

Link to commit that introduced the bug: https://github.com/Miksus/rocketry/commit/c9480a9c847704a0a57b4c6770eeb1028bf029a0

egisxxegis commented 1 year ago

I hope I have collected enough information to be of help 🤝