opensistemas-hub / osbrain

osBrain - A general-purpose multi-agent system module written in Python
https://osbrain.readthedocs.io/en/stable/
Apache License 2.0
175 stars 43 forks source link

How to synchronize action for all agents #356

Closed Blankslide closed 4 years ago

Blankslide commented 4 years ago

Hello,

Thank you very much for this fantastic MAS platform. I have pretty much gone over the documentation. However, I would like to know how I can synchronize the actions of multiple agents without necessarily having a coordinator agent that uses the PUB communication pattern to prompt the agents' actions.

For example:

from osbrain import run_nameserver,run_agent,Agent

class Car(Agent):
    def on_init(self):
        self.set_attr(color='Red')
    def move(self, distance, time):
        self.speed = distance / time
        print(self.speed)

class Main:
    def __init__(self, num_cars):
        self.ns = run_nameserver()
        self.cars_dict = {}
        for i in range(1, num_cars + 1):
            self.cars_dict[f'Car-{i}'] = run_agent(f'Car-{i}', base=Car, serializer='json')

    def run(self):
        for car_name_a, car_a in self.cars_dict.items():
            car_a.move(3, 5)

if __name__ == '__main__':
    main = Main(5)
    main.run()
    main.ns.shutdown()

So, is there a way that all agents can call the move method at once without using the For loop? I tried using python threading, but there was still some lags in the time when the agent call the move method.

Furthermore, I usually get the following error when I call the ns.shutdown()

Exception ignored in: <function Socket.del at 0x0000025A551CB4C8> TypeError: 'NoneType' object is not callable

Peque commented 4 years ago

@Folorunblues Good to know you like osBrain and you already went over the documentation! :blush:

If I understand your problem correctly, I think your best bet is to use PUB-SUB (i.e.: have that coordinator agent you already thought about).

Can you provide a reproducible case for the exception you just shared? With information about the platform (OS) you are using, Python version and osBrain version.

Blankslide commented 4 years ago

@Peque Thank you for your prompt response. I was trying to avoid using a coordinator agent that synchronizes the agent actions because I want the agents to be completely autonomous. I tried using python threading, but due to python's GIL, it resulted in some lags. At the moment, I think I will incorporate the coordinator agent, as you suggested. And, do you think it is worth adding this functionality to the upcoming 0.7.0?

Moreover, the reproducible case for the exception I get any time I call ns.shutdown is found below:

Windows 10 Python 3.7 (I tried it with 3.5, 3.6 versions as well) Osbrain 0.6.5

from osbrain import run_nameserver,run_agent,Agent

class Car(Agent):
    def on_init(self):
        self.set_attr(color='Red')
    def move(self, distance, time):
        self.speed = distance / time
        print(self.speed)

class Main:
    def __init__(self, num_cars):
        self.ns = run_nameserver()
        self.cars_dict = {}
        for i in range(1, num_cars + 1):
            self.cars_dict[f'Car-{i}'] = run_agent(f'Car-{i}', base=Car, serializer='json')

    def run(self):
        for car_name_a, car_a in self.cars_dict.items():
            car_a.move(3, 5)

if __name__ == '__main__':
    main = Main(5)
    main.run()
    main.ns.shutdown()

The above throws the following error:

Exception ignored in: <function Socket.del at 0x000002650054B318> Traceback (most recent call last): File "C:\Users\user\Anaconda3\envs\envs-3\lib\site-packages\zmq\sugar\socket.py", line 67, in del File "C:\Users\user\Anaconda3\envs\envs-3\lib\site-packages\zmq\sugar\socket.py", line 105, in close File "C:\Users\user\Anaconda3\envs\envs-3\lib\site-packages\zmq\sugar\context.py", line 153, in_rm_socket TypeError: 'NoneType' object is not callable

Furthermore, I have another question regarding osbrain serialization. I know it is logically not correct to serialize socket but can osbrain serialize socket? I tried it with sqlite3, but I got the following error despite using 'dill' for serialization: EXCEPTION <class 'TypeError'>: Cannot serialize socket object

Peque commented 4 years ago

Yeah, threads and CPython do not get along very well... That is why we went with processes in osBrain.

Can you open a new issue for the TypeError you are getting? I cannot reproduce it, but it would be good to have it open until someone can reproduce it and maybe fix it.

I would really recommend you to use Linux instead of Windows. You will note some benefits when it comes to using osBrain since, among other things, processes are lighter in Linux than in Windows (with the CoW strategy).

You cannot serialize sockets, as you have already found. :smile: A socket is not something you can send over the wire. You could save the socket information though (address, transport...).

You could try to use the system's clock as a coordinator, to avoid a coordinator agent. You may be able to use osBrain's timers for that (in particular, the .each() method). This is not really a synchronization method, since one agent may lag behind. But it could be useful depending on your exact use case and, in real life, agents may lag too... :stuck_out_tongue_winking_eye:

If you really need complete coordination (i.e.: wait for all agents to finish one step before executing the next step), then message passing is your only way. You will need to wait until all agents say "I'm done!" to the coordinator.

It would be great if, once you solve your problem, you could provide a simplified code example. We can definitely add it to the list of osBrain examples and, if there is some interesting logic we could of course add it to the core of osBrain. :blush:

Blankslide commented 4 years ago

@Peque sure, I will open a new issue for the type error I was getting. And, you are right, Linux is the way to go, I've been planning of switching to Linux for sometimes now. Moreover, synchronization and execution time is a big issue in my simulation. Having a coordinator agent will slow down my simulation. Unfortunately, that is the best option that I have for now. If I find a way around this issue, I will keep you posted. Thanks again for this simple to use MAS platform:)

Blankslide commented 4 years ago

@Peque Sorry I had to reopen this issue, I still have some questions regarding the synchronization of the Agent's action. Based on your suggestion, I tried using a PUB for the coordinator agent. However, I got some weird results. Probably I am missing something.

I tried the following approach as well.

  1. PUB for the coordinator agent and PUSH for the client agents
  2. SYN_PUB for the coordinator agent while the client agents reply when they are done.

    I have a simplified example below:

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'

class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        time.sleep(2)
        print(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self,message):
        yield 1

class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        print(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self,message):
        yield 1

class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.set_attr(status_list=[])

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        if len(self.get_attr('status_list')) < 2:
            self.get_attr('status_list').append(message)

        else:
            self.get_attr('status_list').clear()

    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr,alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})

if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)

    ns.shutdown()

The main issue I had in the above code is that the Coordinator agent doesn't wait to get the reply from the Node agent before proceeding to the next timestep. I wanted the node agent and the transport agent to perform a task (they simply sleep in the above code ) and send "is_done" to the coordinator agent before proceeding to the next timestep. Am I implementing it wrongly? Kindly let me know. Thank you!

Peque commented 4 years ago

@Folorunblues A couple of random notes:

Hope it helps and, please, update the documentation if anything mentioned here could be made more clear for others! :blush: :heart:

Blankslide commented 4 years ago

@Peque Thank you once again for taking the time to look into this issue. I have already tinkered with some of your suggestions, but all to no avail, possibly I wasn't implementing it correctly.

At this point, I thought I should throw more light on what I want to achieve. I want the coordinator agent to send the iteration and timestep information to the node and transport agent. After the node and Transport agents are done, they should send "is_done" to the coordinator agent. So I want the coordinator agent to receive that the agents are done before proceeding to the next timestep/iteration.

My major challenge is that the coordinator agent doesn't wait for the Transport agent to send it is done before proceeding to the next timestep/iteration, despite using a method to make it sleep. Furthermore, I just don't want to estimate the time it will take the agents to be done and put it to sleep based on that.

I made some little changes to the code below by adding a method that checks if the agents are done, but it just hangs without any response. Can you please take a look at the status_checker method to see if I implemented it correctly?

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'

class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1

class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1

class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        if self.status_list.append == 2:
            self.status_list.clear()
        while len(self.status_list) < 2:
            time.sleep(1)

    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})

if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(2)
    ns.shutdown()

Also, you made a valid point that I should push the timestep and iteration into the status handler. However, I just couldn't figure it out. My initial thought was to implement the code as below, but it will not produce the right result that I wanted.

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)
        if self.status_list == 2:
            for iteration in range(1, 2):
                for time_step in range(0, 90, 30):
                    synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)

Lastly, I apologize for bothering you:) I know this issue is quite trivial, unfortunately, I just couldn't get it to work.

Peque commented 4 years ago

@Folorunblues Don't worry, I know osBrain (and MAS in general) can be a little bit confusing at first.

Maybe you are looking for something like this?

from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = "coordinator1"

class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, "is_done", handler="process_reply")

    def process_reply(self, message):
        yield 1

class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, "is_done", handler="process_reply")

    def process_reply(self, message):
        yield 1

class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind(
            "SYNC_PUB", alias=SYNCHRONIZER_CHANNEL_1, handler="status_handler"
        )
        self.status_list = []
        self.iteration = 0
        self.time_step = [30, 90, 0] * 2
        self.done = False

    def start(self):
        self.first_synchronization()

    def finished(self):
        return self.done

    def first_synchronization(self):
        self.iteration += 1
        time_step = self.time_step.pop()
        self.send(
            SYNCHRONIZER_CHANNEL_1,
            message={"time_step": time_step, "iteration": self.iteration},
            topic="first_synchronization",
        )

    def status_handler(self, message):
        yield "I have added you to the status_list"
        self.status_list.append(message)
        if len(self.status_list) < 2:
            return
        self.status_list.clear()
        if len(self.time_step) == 0:
            self.done = True
            return
        if self.iteration >= 2:
            self.done = True
            return
        self.first_synchronization()

    def init_environment(self):
        self.TransportAgent = run_agent("TransportAgent", base=TransportAgent)

        self.NodeAgent = run_agent("NodeAgent", base=NodeAgent)

        self.TransportAgent.connect(
            self.network_agent_addr,
            alias=SYNCHRONIZER_CHANNEL_1,
            handler={
                "first_synchronization": TransportAgent.transportAgent_first_handler
            },
        )
        self.NodeAgent.connect(
            self.network_agent_addr,
            alias=SYNCHRONIZER_CHANNEL_1,
            handler={"first_synchronization": NodeAgent.NodeAgent_first_handler},
        )

if __name__ == "__main__":

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent(
        "Synchronizer_CoordinatorAgent", base=SynchronizerCoordinatorAgent
    )
    synchronizer_coordinator_agent.init_environment()
    synchronizer_coordinator_agent.start()

    while not synchronizer_coordinator_agent.finished():
        time.sleep(0.5)
    ns.shutdown()

Please note:

Blankslide commented 4 years ago

@Peque Thank you once again for your time. It is highly appreciated. And yes, your modified code shows what I am looking for. However, I still have some questions regarding my initial code so that I can use Osbrain to its full capacity. I need you to please clarify the comments below:

  1. When using SYNC_PUB, it seems that the coordinator agent doesn't necessarily need to wait for the SUB agents to reply before it releases the connection? I mean, the SUB agents don't even need to send anything to the coordinator.

  2. The #CODE 2 below gave an output of :

(TransportAgent): {'time_step': 0, 'iteration': 1} (NodeAgent): {'time_step': 0, 'iteration': 1}

Which is what I wanted before it proceeds to the next timestep. However, after printing the above, it just hangs. What is wrong in the "status_checker" method below?


#CODE 2
from osbrain import run_nameserver, run_agent, Agent

import time

SYNCHRONIZER_CHANNEL_1 = 'coordinator1'

class TransportAgent(Agent):
    def transportAgent_first_handler(self, message):
        # time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1

class NodeAgent(Agent):
    def NodeAgent_first_handler(self, message):
        time.sleep(2)
        self.log_info(message)
        self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')

    def process_reply(self, message):
        yield 1

class SynchronizerCoordinatorAgent(Agent):
    def on_init(self):
        self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
        self.status_list = []

    def first_synchronization(self, time_step, iteration):
        self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
                  topic='first_synchronization')

    def status_handler(self, message):
        yield 'I have added you to the status_list'
        self.status_list.append(message)

    def status_checker(self):
        count = 0
        while len(self.status_list) < 2:
            count += 1
            time.sleep(1)
            return
        self.status_list.clear()

    def init_environment(self):
        self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)

        self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)

        self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                                    handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
        self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
                               handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})

if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            synchronizer_coordinator_agent.status_checker()
    time.sleep(1)
  1. The only thing that worked was when I had to sleep it manually as #CODE 1:

It produced the right result as:

(TransportAgent): {'time_step': 0, 'iteration': 1} (NodeAgent): {'time_step': 0, 'iteration': 1} (TransportAgent): {'time_step': 30, 'iteration': 1} (NodeAgent): {'time_step': 30, 'iteration': 1} (TransportAgent): {'time_step': 60, 'iteration': 1} (NodeAgent): {'time_step': 60, 'iteration': 1}

Unfortunately, my project have variable time for the agents to complete their tasks, so I cant necessarily estimate it.

#CODE 1
if __name__ == '__main__':

    ns = run_nameserver()
    synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
                                               base=SynchronizerCoordinatorAgent)
    synchronizer_coordinator_agent.init_environment()

    for iteration in range(1, 2):
        for time_step in range(0, 90, 30):
            synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
            time.sleep(4)
    time.sleep(1)
Peque commented 4 years ago

Yeah, SYNC_PUB does not require all the listeners to reply before publishing the next message. It is meant for publishing incremental updates. The sync happens when a client requests for a snapshot of the current state of the publisher.

You need to implement the "check that all agents are done before sending next pub" yourself in the coordinator agent. What you are implementing is a specific use case which could be included in osBrain if it was common enough and clean enough. ^^

As you guessed already, you do not need SYNC_PUB. You could be using a normal PUB and a PULL from the coordinator agent. Only SYNC_PUB creates both for you, so you may find it a little bit more convenient.

Your question about the code seems to be less related to osBrain and more to programming. Would you mind posting it in Stack Overflow? Link it here and I will try to have a look at it and answer it.

Blankslide commented 4 years ago

@Peque You are right, SYNC_PUB works perfectly for me as I won't be needing any PULL server. I will post the question on Stack Overflow, and I will link it here. Thanks again for your time.

Blankslide commented 4 years ago

@Peque You can view the question here stackoverflow. Thanks once again!

Blankslide commented 4 years ago

@Peque You have addressed my questions on StackOverflow, it was really helpful. Thank you once again.

Peque commented 4 years ago

@Folorunblues Good to know!

Do not forget to upvote/accept the answer in Stack Overflow if it helped you and close this issue if you consider it as resolved. :wink:

Also, feel free to contribute changes to the osBrain documentation if you think there is something that could be extended or improved. If you had to come here to ask that means the documentation is not good or clear enough. :blush:

Blankslide commented 4 years ago

@Peque I already accepted the answer on Stack Overflow:) and, sure, I will always be happy to contribute to the improvement of the documentation. And, I will always come back here to ask questions until I am done with my project:). I will probably direct any programming related question with Osbrain to Stack Overflow.

Moreover, although I already closed this issue here #357 , of recent, even despite upgrading to the latest version of pyzmq it has started with the Nonetype error again. Can you please confirm if Osbrain is compatible with the newest version of pyzmq?

Should I close this issue and reopen this #357 , so we continue the discussion there?

Peque commented 4 years ago

@Folorunblues It seems you did not accept the answer in Stack Overflow (?).

You can reopen #357 if you think it is still an issue. Please, provide the code and environment to reproduce it (operating system, Python version, package versions...).

Yeah, close this issue if your synchronization question has been answered.

Blankslide commented 4 years ago

@Peque Yes, I did accept it, but it seems new users are not allowed to upvote/accept on Stack Overflow:). Sure, I will close this and reopen the other one. Thanks once again!

Peque commented 4 years ago

@Folorunblues You are welcome. :blush:

Yeah, you need 15 reputation to up-vote. I thought accepting an answer was always an option though (at least, it does not require any special privileges).