komuw / wiji

Wiji is an asyncio distributed task processor/queue.
MIT License
4 stars 1 forks source link

validate that code inside `Task.async_run` is not blocking #4

Closed komuw closed 5 years ago

komuw commented 5 years ago

The code inside async_run; https://github.com/komuw/xyzabc/blob/ef3238de831e6b0c857328f8823b05295851bb17/xyzabc/task.py#L68

should actually be async code with no blocking calls.

However, if a user were to write something like

class BlockingHttptask(xyzabc.task.Task):
    async def async_run(self):
        import requests

        # blocks for 23 seconds
        url = "https://httpbin.org/delay/23"
        resp = requests.get(url)

class NonBlockingTask(xyzabc.task.Task):
    async def async_run(self, bbb, a=5.5):
        result = bbb * a
        print("result: ", result)
        return res

blocking_task = BlockingHttptask(the_broker="the_broker", queue_name="HttpQueue")
non_blocking_task = NonBlockingTask(the_broker="the_broker", queue_name="MultiplierTaskQueue")

blocking_task_worker = xyzabc.Worker(the_task=blocking_task)
non_blocking_task_worker = xyzabc.Worker(the_task=non_blocking_task)

if __name__ == "__main__":

    async def async_main():
        gather_tasks = asyncio.gather(
            blocking_task_worker.consume_forever(), non_blocking_task_worker.consume_forever()
        )
        await gather_tasks

    asyncio.run(async_main(), debug=True)

The execution of blocking_task will also block execution of non_blocking_task.

This is because, python asyncio runs everything in one thread(in an event loop). It is thus important that users only put non-blocking code inside async_run methods. However, users may sometimes fail to do that. If they put blocking IO(or cpu), we should figure out how to;

We have already institued measures to make sure that users do not change signature of async_run method to be non async; https://github.com/komuw/xyzabc/blob/e046a061b538586e5ddbd7aa2b47ed448f3f28c8/xyzabc/task.py#L233-L242

We however need a way to check if code inside async_run is blocking and act.

ref:

  1. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
  2. https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor
komuw commented 5 years ago
  • when we execute the first task(or any other), we measure how long it took to execute

It appears that the method outlined above may not work

komuw commented 5 years ago

David Beazley, and Nathaniel J. Smith have weighed[1] in on the matter.
It appears that we could somehow do it, but it gets complicated[2] very fast.

screen shot 2019-02-23 at 12 41 00

ref:

  1. https://twitter.com/komu_wairagu/status/1099052582968922113
  2. https://github.com/python-trio/trio/issues/591
komuw commented 5 years ago

PR: to add task blocked watchdog to trio

ref:

  1. https://github.com/python-trio/trio/pull/596
komuw commented 5 years ago

Victor Stinner suggested[1];

It should be possible to write something on top of `ptrace` \
or something like that. \
Blocking I/O syscalls should be avoided from the thread running the event loop.

and also[2]

I implemented this simple debugging tool early in asyncio to detect blocking IO \
blocking the event loop.       \
It uses a threshold of 100 ms by default, it is configurable
  1. https://twitter.com/VictorStinner/status/1099355649786474496
  2. https://twitter.com/VictorStinner/status/1099354977531768833
komuw commented 5 years ago

Fixed by:

  1. https://github.com/komuw/wiji/blob/e046a061b538586e5ddbd7aa2b47ed448f3f28c8/xyzabc/task.py#L233-L242

  2. https://github.com/komuw/wiji/blob/4f0d4c94b3c9e05d522c4f359ce615610d50b82a/wiji/watchdog.py

  3. https://github.com/komuw/wiji/blob/65b7a8c91487c76b38c52f4b4b84bf8ea62a95ec/wiji/worker.py#L117-L138

komuw commented 5 years ago

Another option is to use python signal.alarm

https://github.com/ask/mode/blob/2ffae126df4893c3b2036cf6bb05d933362961c2/mode/debug.py