OFFIS-DAI / mango

Modular Python-based agent framework to implement multi-agent systems
https://mango-agents.readthedocs.io/
MIT License
12 stars 2 forks source link

As Agent Process does not schedule tasks correctly with externalClock #54

Open rcschrg opened 1 year ago

rcschrg commented 1 year ago

In GitLab by @maurerle on May 28, 2023, 11:51

The agent process feature seems to be very useful and has a lot of potential! Very neat!

Yet I found the following bug scheduling tasks to an external Clock:

I would expect that this new feature should behave correctly in the following example, yet it only schedules the first occurence. This happens with schedule_recurrent_task, schedule_periodic_task and similar - because the time in the mirror container is not synchronized with the original container..

Besides that: Why is WAIT_STEP = 0.01?

Full example:

import asyncio
from datetime import datetime

from dateutil import rrule

from mango import Agent, create_container
from mango.util.clock import ExternalClock

class Caller(Agent):
    def __init__(self, container, receiver_addr, receiver_id, recurrency):
        super().__init__(container)
        self.receiver_addr = receiver_addr
        self.receiver_id = receiver_id
        self.schedule_recurrent_task(
            coroutine_func=self.send_hello_world, recurrency=recurrency
        )

    async def send_hello_world(self):
        time = datetime.fromtimestamp(self._scheduler.clock.time)
        await self.send_acl_message(
            receiver_addr=self.receiver_addr,
            receiver_id=self.receiver_id,
            content=f"Current time is {time}",
        )

    def handle_message(self, content, meta):
        pass

class Receiver(Agent):
    def __init__(self, container):
        super().__init__(container)
        self.wait_for_reply = asyncio.Future()

    def handle_message(self, content, meta):
        print(f"Received a message with the following content: {content}.")

async def main(start):
    clock = ExternalClock(start_time=start.timestamp())
    addr = ("127.0.0.1", 5555)
    # market acts every 15 minutes
    recurrency = rrule.rrule(rrule.MINUTELY, interval=15, dtstart=start)

    c = await create_container(addr=addr, clock=clock)
    same_process = True
    if same_process:
        receiver = Receiver(c)
        caller = Caller(c, addr, receiver.aid, recurrency)

    else:
        def creator(container):
            receiver = Receiver(container)
            caller = Caller(c, addr, receiver.aid, recurrency)

        await c.as_agent_process(
            agent_creator=creator
        )
    if isinstance(clock, ExternalClock):
        for i in range(100):
            await asyncio.sleep(0.01)
            clock.set_time(clock.time + 60)
    await c.shutdown()

if __name__ == "__main__":
    from dateutil.parser import parse

    start = parse("202301010000")
    asyncio.run(main(start))
rcschrg commented 1 year ago

In GitLab by @maurerle on May 28, 2023, 12:42

I could get it to work using the DistributedClockAgent which distributes the time to its mirroragent running in the mirror container:

import asyncio
from datetime import datetime

from dateutil import rrule

from mango import Agent, create_container
from mango.util.clock import ExternalClock
from mango.util.distributed_clock import DistributedClockAgent, DistributedClockManager

class Caller(Agent):
    def __init__(self, container, receiver_addr, receiver_id, recurrency):
        super().__init__(container)
        self.receiver_addr = receiver_addr
        self.receiver_id = receiver_id
        self.schedule_recurrent_task(
            coroutine_func=self.send_hello_world, recurrency=recurrency
        )

    async def send_hello_world(self):
        time = datetime.fromtimestamp(self._scheduler.clock.time)
        await self.send_acl_message(
            receiver_addr=self.receiver_addr,
            receiver_id=self.receiver_id,
            content=f"Current time is {time}",
        )

    def handle_message(self, content, meta):
        pass

class Receiver(Agent):
    def __init__(self, container):
        super().__init__(container)
        self.wait_for_reply = asyncio.Future()

    def handle_message(self, content, meta):
        print(f"Received a message with the following content: {content}.")

async def main(start):
    clock = ExternalClock(start_time=start.timestamp())
    addr = ("127.0.0.1", 5555)
    # market acts every 15 minutes
    recurrency = rrule.rrule(rrule.MINUTELY, interval=15, dtstart=start)

    c = await create_container(addr=addr, clock=clock)
    same_process = False

    clock_manager = DistributedClockManager(
        c, receiver_clock_addresses=[addr]
    )

    if same_process:
        receiver = Receiver(c)
        caller = Caller(c, addr, receiver.aid, recurrency)

    else:
        def creator(container):
            receiver = Receiver(container)
            caller = Caller(container, addr, receiver.aid, recurrency)
            clock_agent = DistributedClockAgent(container)

        await c.as_agent_process(
            agent_creator=creator
        )
    if isinstance(clock, ExternalClock):
        for i in range(100):
            await asyncio.sleep(0.01)
            clock.set_time(clock.time + 60)
            await clock_manager.distribute_time()
    await c.shutdown()

if __name__ == "__main__":
    from dateutil.parser import parse

    start = parse("202301010000")
    asyncio.run(main(start))

So it might be needed to add this feature when using as_agent_process..?

rcschrg commented 1 year ago

In GitLab by @maurerle on Sep 11, 2023, 13:47

Pinging this - what is the best way to resolve this? Should the DistributedClockManager be used for this?

rcschrg commented 9 months ago

I think it makes sense to use the DistributedClockManager. However it is not intuitive that it does not work with the regular external clock. So I think we should tackle this in the future, maybe we can just use shared memory in the external clock to resolve this.