dabeaz / curio

Good Curio!
Other
4.02k stars 241 forks source link

TaskGroup closes prematurely #239

Closed jchayat closed 6 years ago

jchayat commented 6 years ago

Symptom

When running the following code (python 3.6.3, curio 0.8):

import curio

async def level1():

    async def level2(task_group):

        async def level3():
            print("hello world")

        await task_group.spawn(level3)

    async with curio.TaskGroup() as task_group:
        await task_group.spawn(level2, task_group)

if __name__ == '__main__':
    curio.run(level1)

I'm getting:

Task 3 crashed
Traceback (most recent call last):
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/kernel.py", line 826, in _run_coro
    trap = current._send(current.next_value)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 96, in _task_runner
    return await coro
  File "/Users/jon/code/python/hello-curio/bug.py", line 11, in level2
    await task_group.spawn(level3)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 336, in spawn
    raise RuntimeError('Task group is closed')
RuntimeError: Task group is closed
Task 2 crashed
Traceback (most recent call last):
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/kernel.py", line 826, in _run_coro
    trap = current._send(current.next_value)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 96, in _task_runner
    return await coro
  File "/Users/jon/code/python/hello-curio/bug.py", line 14, in level1
    await task_group.spawn(level2, task_group)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 446, in __aexit__
    await self.join(wait=self._wait)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 435, in join
    raise TaskGroupError(exceptional)
curio.errors.TaskGroupError: TaskGroupError(RuntimeError)
Traceback (most recent call last):
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/kernel.py", line 826, in _run_coro
    trap = current._send(current.next_value)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 96, in _task_runner
    return await coro
  File "/Users/jon/code/python/hello-curio/bug.py", line 14, in level1
    await task_group.spawn(level2, task_group)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 446, in __aexit__
    await self.join(wait=self._wait)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/task.py", line 435, in join
    raise TaskGroupError(exceptional)
curio.errors.TaskGroupError: TaskGroupError(RuntimeError)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/jon/code/python/hello-curio/bug.py", line 18, in <module>
    curio.run(level1)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/kernel.py", line 877, in run
    return kernel.run(corofunc, *args, timeout=timeout)
  File "/Users/jon/code/python/hello-curio/lib/python3.6/site-packages/curio/kernel.py", line 214, in run
    raise TaskError('Task Crashed') from ret_exc
curio.errors.TaskError: Task Crashed

Analysis

The problem seems to be that task_group is marked closed as soon as execution in level1 reaches the async with block end, which doesn't take into account that tasks under the task_group may continue spawning additional tasks under task_group.

Why pass a TaskGroup to a spawned task?

Because otherwise it's impossible to return a value and have tasks keep running in the background. Consider the following code (which also fails):

import curio

async def level1():

    async def level2(task_group):
        queue = curio.Queue()

        async def level3():

            async def level4():
                message = await queue.get()
                print(message)

            await task_group.spawn(level4)

        await task_group.spawn(level3)

        return queue

    async with curio.TaskGroup() as task_group:
        queue = await level2(task_group)
        await queue.put("hello world")

if __name__ == '__main__':
    curio.run(level1)

Right now I'm working around this by suppling the queue from outside as a parameter - but this is ugly IMO.

dabeaz commented 6 years ago

Interesting. Will investigate.

dabeaz commented 6 years ago

I've pushed a change that fixes the above example and which might work generally. Probably needs more testing. Would like to get feedback if it solves the problem.

jchayat commented 6 years ago

Yes, my tests pass now. 👍

dabeaz commented 6 years ago

Closing as earlier fix seems to have addressed it.