from parla import Parla, spawn
from parla.cython.tasks import AtomicTaskSpace
def wrapper_chain(T, i=0):
@spawn(T[i], dependencies=[T[i-1]])
async def outer():
TT = AtomicTaskSpace(f"task_{i}")
await TT
with Parla():
T = AtomicTaskSpace("main")
i = 0
while i < 30000:
wrapper_chain(T, i)
i += 1
Error:
Exception in Task Task(main_64) : cannot reuse already awaited coroutine Traceback (most recent call last):
File "tasks.pyx", line 416, in parla.cython.tasks.Task.run
File "tasks.pyx", line 566, in parla.cython.tasks.ComputeTask._execute_task
File "scheduler.pyx", line 494, in parla.cython.scheduler._task_callback
RuntimeError: cannot reuse already awaited coroutine
Hypothesis:
This is behaving like the continuation task is duplicated in the task queue due to a race condition.
At a first pass all of the counts look correct. Events are reset to 1 before dependencies are added.
Events are either handled by the worker thread or scheduler thread (whichever gets fetch_sub(1) == 1 first)
Adding print statements makes the race condition disappear.
Everyway I've tried so far to log the cause of this error in the runtime makes it disappear. I may need a better reproduction script or even lighter weight logging.
Minimal Reproducer (~every 1 in 10 runs):
Error:
Hypothesis:
This is behaving like the continuation task is duplicated in the task queue due to a race condition. At a first pass all of the counts look correct. Events are reset to 1 before dependencies are added. Events are either handled by the worker thread or scheduler thread (whichever gets
fetch_sub(1) == 1
first)Adding print statements makes the race condition disappear.