dabeaz / curio

Good Curio!
Other
4.02k stars 241 forks source link

'never joined' message when adding terminated tasks to task group #212

Closed goldcode closed 6 years ago

goldcode commented 7 years ago

While using TaskGroup.add_task(coro) it's possible in my application that the coro which was added is sometimes already terminated. in such cases (and only such:terminated case) even after waiting for all tasks in the task group.

async for task in g:
    task.result

i get the following: ... state='TERMINATED') never joined

dabeaz commented 7 years ago

I've pushed a change that may fix this. The main purpose of the message is to identify tasks that fall off the face the earth and are never looked at. In this case, I've modified the task.result property to mark the task as "joined".

goldcode commented 7 years ago

works with

async for task in g:
    task.result

but doesn't work for g.join()

goldcode commented 7 years ago

identify tasks that fall off the face the earth i have had situations where there are tasks which i'm not particularly interested in knowing the outcome of. use case is in objects with observer pattern. in such cases, the object simply emits an event which results in a task or chain of tasks being spawned. but the object itself is not interested in the results of such tasks. i have resorted to the hack of spawning daemon tasks in such cases.

phiweger commented 7 years ago

How can I modify the code below to deal with the Warning

# WARNING:curio.task:Task(id=868, name='worker',
# <coroutine object worker at 0x1112e90a0>, state='TERMINATED') never joined

Any hints are greatly appreciated.

import curio
import asks
# We could use grequests or a similar solution. However, async is the future,
# so let's invest in it.
asks.init('curio')

# build a couple of urls
uid_list = []
for i in range(24):
    uid_list.append('AEVV030000' + '{:02d}'.format(i))

url_list = []
base = 'http://www.ebi.ac.uk/ena/data/view/'
for uid in uid_list:
    url_list.append(base + uid + '&display=fasta')

results = []

async def worker(sema, url):
    async with sema:
        r = await asks.get(url)
        print('foo')
        results.append(r)
        print('got ', url)

async def main(url_list):
    sema = curio.BoundedSemaphore(value=8) # Set sofa size.
    for url in url_list:
        await curio.spawn(worker(sema, url))

curio.run(main(url_list))
# creates ...
# Task(id=187, name='worker', <coroutine object worker at 0x1111f85c8>, state='TERMINATED') never joined
# ... for each url
goldcode commented 7 years ago

You are getting the warning from curio because you aren't joining on your spawned task. One option would be to use a TaskGroup. TaskGroup used as a context manager automatically does the joining for you.

edit: TaskGroup is a solution in your case only if the tasks are related to each other. the default behaviour AFAIR is if one task crashes, then other running tasks in the TaskGroup are cancelled. i think TaskGroup currently does not offer the behaviour of waiting for all tasks irrespective of them either crashing or proceeding to completion.

phiweger commented 7 years ago

In my case, I am getting database entries, which are not related, i.e. if one query fails, the other should stay alive. Is there a way to accomplish that with a context that joins automatically?

Also, suppressing the warnings with warnings.filterwarnings('ignore') does not suppress them. Why could that be?

phiweger commented 7 years ago

modified the code and it works, thanks :)

async def main(url_list):
    sema = curio.BoundedSemaphore(value=8) # Set sofa size.
    async with curio.TaskGroup() as g:
        for url in url_list:
            await g.spawn(worker(sema, url))