OFFIS-DAI / mango

Modular Python-based agent framework to implement multi-agent systems
https://mango-agents.readthedocs.io/
MIT License
12 stars 2 forks source link

terminating a simulation with a task which was not awaited has a bad stacktrace #62

Open maurerle opened 9 months ago

maurerle commented 9 months ago

If you have this example with pytest:

from mango.util.scheduler import Scheduler
import datetime
import asyncio

@pytest.mark.asyncio
@pytest.mark.filterwarnings(
    "ignore::RuntimeWarning"
)  # this test will stop the coro before scheduler awaits for it
async def test_one_shot_timeouted_conv():
    # GIVEN
    scheduler = Scheduler()
    l = []

    async def increase_counter():
        l.append(1)

    # WHEN
    t = scheduler.schedule_timestamp_task(
        increase_counter(),
        (datetime.datetime.now() + datetime.timedelta(seconds=0.3)).timestamp(),
    )
    with pytest.raises(asyncio.exceptions.TimeoutError):
        await asyncio.wait_for(t, timeout=0.2)

    # THEN
    assert len(l) == 0

and run it with

pytest tests/unit_tests/util/scheduling_test.py

It responds with a lot of warnings:

tests/unit_tests/util/scheduling_test.py::test_tasks_complete_spwaning_rec
  /home/maurer/.conda/envs/mango/lib/python3.12/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object ensure_future.<locals>._wrap_awaitable at 0x7faeb49a1540>

  Traceback (most recent call last):
    File "/home/maurer/.conda/envs/mango/lib/python3.12/asyncio/tasks.py", line 674, in _wrap_awaitable
      return await awaitable
             ^^^^^^^^^^^^^^^
    File "/home/maurer/fh-gitlab/projects/mango/mango/util/scheduling.py", line 48, in __await__
      signal = send(message)
               ^^^^^^^^^^^^^
  RuntimeError: cannot reuse already awaited coroutine

    warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))

I have a similar issue stopping a mango simulation which also has this problem. Maybe we can improve the error handling here

I tried catching this in mango, but could not succeed. Maybe you can have a look here?

rcschrg commented 9 months ago

Hello @maurerle, thats really interesting. I think it really is a rare edge case but definetly not that beautiful. It is caused by the suspendable feature of scheduler tasks. You can use Scheduler(suspendable=False), or even Agent(suspendable_tasks=False) to deactivate it, if you do not need your tasks to be suspendable. Would this be sufficient?

maurerle commented 9 months ago

Good find, thanks. This seems to help for the tests, but I still get this error when running a simulation with roleAgents:

from mango import RoleAgent, create_container, Role
import asyncio

class Caller(Role):
    def setup(self):
        super().setup()
        self.context.schedule_timestamp_task(
            coroutine=self.send_hello_world(self.context.addr, self.context.aid),
            timestamp=self.context.current_timestamp + 5,
        )

    async def send_hello_world(self, receiver_addr, receiver_id):
        await self.send_acl_message(
            receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
        )

async def main():
    mr = Caller()

    container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp")
    ag = RoleAgent(container, suspendable_tasks=False)
    ag.add_role(mr)

if __name__ == "__main__":
    try:
        asyncio.run(main())
       # exception handler does not help 
    except asyncio.CancelledError:
        pass 
Exception in callback Agent.raise_exceptions(<Task cancell.../core.py:435>>)
handle: <Handle Agent.raise_exceptions(<Task cancell.../core.py:435>>)>
Traceback (most recent call last):
  File "/home/maurer/.conda/envs/assume-torch/lib/python3.11/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/maurer/.conda/envs/assume-torch/lib/python3.11/site-packages/mango/agent/core.py", line 429, in raise_exceptions
    if fut.exception() is not None:
       ^^^^^^^^^^^^^^^
  File "/home/maurer/.conda/envs/assume-torch/lib/python3.11/site-packages/mango/agent/core.py", line 442, in _check_inbox
    message = await self.inbox.get()
              ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/maurer/.conda/envs/assume-torch/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError
sys:1: RuntimeWarning: coroutine 'Caller.send_hello_world' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

This does only seem to occur when using RoleAgents? I did not find an easy way to catch the CancelledError - maybe you can help me with that?

rcschrg commented 9 months ago

What happens: The cancellation happens because the asyncio loop has to be closed, so all open tasks are canceled. In your example, the task is never awaited; right after you create the task, your coroutine main() finishes, so the task is never executed. From what I understood, this is intended in the example, right? (if not, you might wanna await shutdown() (agents', containers') and or task_complete()). And you want to catch the exception if that happens for other reasons and it is simplified here. The whole problem here is asyncios implementation:

to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })

What I understand from this is that the exception is never raised to be caught from outside but handled purely within asyncio using call_exception_handler. So there should not be a possibility of catching the Exception when using asyncio.run().

By the way, it also happens with plain Agents.

from mango import Agent, create_container
import asyncio

class Caller(Agent):
    def __init__(
        self,
        container,
        suggested_aid: str = None,
        suspendable_tasks=True,
        observable_tasks=True,
    ):
        super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
        self.schedule_timestamp_task(
            coroutine=self.send_hello_world(self.addr, self.aid),
            timestamp=self.current_timestamp + 5,
        )

    async def send_hello_world(self, receiver_addr, receiver_id):
        await self.send_acl_message(
            receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
        )

async def main():
    container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp")
    Caller(container, suspendable_tasks=False)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except asyncio.exceptions.CancelledError:
        print("Catch it")
        pass

Maybe you can provide more information on the specific case, the error occurs in your simulation?

maurerle commented 9 months ago

Hi @rcschrg ,

thanks for analyzing this. In my simulation this also happens when using container.shutdown() properly.

Background: I have a market, which schedules hourly openings, but has this error if the next opening is scheduled after the end time of the simulation. Agents should not need to know about the simulation end - if there is any - and I am calling my simulation as a cli - so I can not have my code run in __main__.

I now have a minimal example, which failes in a weird way, even though:

from mango import Agent, create_container
from mango.util.clock import ExternalClock
import asyncio
from datetime import datetime, timedelta
import calendar
from tqdm import tqdm

class Caller(Agent):
    def __init__(
        self,
        container,
        schedule_time:datetime,
        suggested_aid: str = None,
        suspendable_tasks=True,
        observable_tasks=True,
    ):
        super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
        self.schedule_timestamp_task(
            coroutine=self.send_hello_world(self.addr, self.aid),
            timestamp=schedule_time.timestamp(),
        )
    async def send_hello_world(self, receiver_addr, receiver_id):
        await self.send_acl_message(
            receiver_addr=receiver_addr, receiver_id=receiver_id, content=f"Hello World {self.current_timestamp}"
        )

    def handle_message(self, content, meta):
        print("got", content)

class World():
    def __init__(self, start: datetime, end: datetime):
        self.clock = ExternalClock(0)
        self.start = start
        self.end = end

        self.loop = asyncio.get_event_loop()
        asyncio.set_event_loop(self.loop)

    async def setup(self):
        self.container = await create_container(
            connection_type="tcp",
            addr=("localhost", 9099),
            clock=self.clock,
        )
        # the caller wants to send a message after the end of the simulation
        self.caller = Caller(self.container, suspendable_tasks=True, schedule_time=self.end+timedelta(days=14))

    async def async_run(self, start_ts, end_ts):
        pbar = tqdm(total=end_ts - start_ts)

        # allow registration before first opening
        self.clock.set_time(start_ts - 1)
        while self.clock.time < end_ts:
            await asyncio.sleep(0)
            self.clock.set_time(self.clock.time + 600)

            # normal agent stuff happening on schedules
            pbar.update(600)
            pbar.set_description(
                repr(datetime.utcfromtimestamp(self.clock.time)),
                refresh=False,
            )
            await asyncio.sleep(0.01)
        pbar.close()

        # # the above loop is only for better understanding of what happens.
        # # same can be achieved with the below line, which schedules in the future
        # self.caller.schedule_timestamp_task(
        #     coroutine=self.caller.send_hello_world(self.caller.addr, self.caller.aid),
        #     timestamp=self.caller.current_timestamp + 100,
        # )
        await asyncio.sleep(0.1)
        await self.container.shutdown()

    def run(self):
        start_ts = calendar.timegm(self.start.utctimetuple())
        end_ts = calendar.timegm(self.end.utctimetuple())

        try:
            return self.loop.run_until_complete(
                self.async_run(start_ts=start_ts, end_ts=end_ts)
            )
        except KeyboardInterrupt:
            pass

def test_func():    
    start = datetime(2019, 1, 1)
    end = datetime(2019, 1, 1, 1)
    w = World(start, end)
    try:
        w.loop.run_until_complete(w.setup())
        w.run()
    except asyncio.exceptions.CancelledError:
        print("Catch it")

if __name__ == "__main__":
    test_func()

    # # direct execution works fine:
    # start = datetime(2019, 1, 1)
    # end = datetime(2019, 1, 1, 1)
    # w = World(start, end)
    # try:
    #     w.loop.run_until_complete(w.setup())
    #     w.run()
    # except asyncio.exceptions.CancelledError:
    #     print("Catch it")

results in:

datetime.datetime(2019, 1, 1, 1, 9, 59): : 4200it [00:00, 58772.96it/s]                                                                                                  
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<_wrap_awaitable() running at /home/maurer/.conda/envs/mango/lib/python3.11/asyncio/tasks.py:694> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[ScheduledTask.on_stop(), Scheduler._remove_task()]>
Exception ignored in: <generator object _wrap_awaitable at 0x7f1adda50100>
Traceback (most recent call last):
  File "/home/maurer/.conda/envs/mango/lib/python3.11/asyncio/tasks.py", line 694, in _wrap_awaitable
    return (yield from awaitable.__await__())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/maurer/.conda/envs/mango/lib/python3.11/site-packages/mango/util/scheduling.py", line 48, in __await__
    signal = send(message)
             ^^^^^^^^^^^^^
RuntimeError: cannot reuse already awaited coroutine
sys:1: RuntimeWarning: coroutine 'Caller.send_hello_world' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

It is very weird, that the function test_func gives this error, while executing the code directly does not..

When running with suspendable_tasks=False this gives less output:

Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<TimestampScheduledTask.run() done, defined at /home/maurer/.conda/envs/mango/lib/python3.11/site-packages/mango/util/scheduling.py:154> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[ScheduledTask.on_stop(), Scheduler._remove_task()]>
sys:1: RuntimeWarning: coroutine 'Caller.send_hello_world' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

But this is still not good and can have a lot of lines if many tasks are aborted. So I need to find a way to silence this.. I don't really have a clue, what is different between calling the function in main and inside a function..?

maurerle commented 9 months ago

or an even smaller example, derived from your example:

from mango import Agent, create_container
import asyncio

class Caller(Agent):
    def __init__(
        self,
        container,
        suggested_aid: str = None,
        suspendable_tasks=True,
        observable_tasks=True,
    ):
        super().__init__(container, suggested_aid, suspendable_tasks, observable_tasks)
        self.schedule_timestamp_task(
            coroutine=self.send_hello_world(self.addr, self.aid),
            timestamp=self.current_timestamp + 5,
        )

    async def send_hello_world(self, receiver_addr, receiver_id):
        await self.send_acl_message(
            receiver_addr=receiver_addr, receiver_id=receiver_id, content="Hello World"
        )

async def main():
    from mango.util.clock import ExternalClock
    container = await create_container(addr=("0.0.0.0", 9099), connection_type="tcp", clock=ExternalClock(0))
    Caller(container, suspendable_tasks=False)
    await asyncio.sleep(0)
    await container.shutdown()

def sync_main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

if __name__ == "__main__":
    import warnings

    warnings.filterwarnings("ignore")
    #warnings.filterwarnings("ignore", "coroutine.*?was never awaited.*")
    sync_main()

To clarify: i am calling the script directly from a terminal python ./mango_test.py - no ipython magic (which typically already runs a asyncio loop) is used here

rcschrg commented 9 months ago

I see. I think I understand the core problem, which might cause this in real scenarios.

Ok, I see three options:

  1. You suppress the warning
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "coroutine.*?was never awaited.*")
            asyncio.run(...)
  2. We change all ScheduledTasks, such that they would create the coroutine exactly when needed (this would imply that you would not pass the coroutine to the scheduler anymore, but you would pass the coroutine function + arguments to the scheduler for every single task)
  3. We implement a close() method in all ScheduledTasks, which need to call close() on open coroutines when shutdown() of the scheduler is called.

My opinion:

  1. Should be ok, short term
  2. I don't like that API-wise, but it would be the cleanest solution by far (also API break :()
  3. Pretty risk-free and no API change, but a bit annoying when implementing new types of tasks
maurerle commented 9 months ago

Thanks,

I updated my minimal example above, so that it also uses externalclock, which does not run further and still gives:

Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<TimestampScheduledTask.run() done, defined at /home/maurer/.conda/envs/mango/lib/python3.11/site-packages/mango/util/scheduling.py:154> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[ScheduledTask.on_stop(), Scheduler._remove_task()]>

even with warnings ignored..

Yes, I thought about 2. too, but wasn't to sure if this would be a valid solution

  1. would sound fine too
rcschrg commented 9 months ago

This should be fixed when you replace loop.run_until_complete() with asyncio.run(). At least it works for me.

Regarding, "coroutine 'Caller.send_hello_world' was never awaited", I think I'd prefer 3 at this point, just because of the API break 2 would cause. I'm putting it on my timetable for next week.