root-11 / maslite

A very fast multi agent messaging kernel in Python
MIT License
11 stars 4 forks source link

Multiprocessing demo of maslite / discuss further API needs. #19

Open root-11 opened 6 months ago

root-11 commented 6 months ago

In many agent based systems, communication has scale free distribution characteristics. This means that a lot of messages are exchanged within clusters of agents, whilst few messages are exchanged between clusters.

For the purpose of simulation, this means that there is a benefit is distributing the simulation to multiple cores and exploit the principles of multiprocessing.

Why not just keep things simple? Let's say that you want to run a simulation at 100x real-time, but you can't because there are 8 clusters of agents that have sufficiently large overhead and exchange numerous messages to cause you to slow down to a ${wall time}/{simulated time}$-relationship of 20x real-time. In other words, your simulation takes 5x as long to run as you would like. However you also know that None of the clusters individually run slower than +100x real-time. E.g. your clusters are fast enough individually.

It would hence be convenient to partition the system such that each cluster is on their own logical core.

Modern PC have many cores. I can for example log on to Azure and rent a 48 core machine with 1 Tb RAM for under $100 for an hour.

image

For problems where the clusters exceed the capacity of a single machine, I would have to partition the simulation over multiple machines and permit message exchange between the machines using sockets.

As the expansion from single core simulation to multi-core simulation is the first and most important step, I will in what follows present a simple recipe for such a configuration, which I will explain step by step. The whole script is at the bottom of this post as mp_test.py so you won't need to copy paste the code.

A nice API.

The first thing we want is a nice API, but for multiprocessing we can't just launch a python process and split the scheduler: We need a parent-child relationship. In the snippet below, I have chosen MPmain as the parent, and MPScheduler as the child.

The MPScheduler is practically just a regular maslite.Scheduler with some multiprocessing switches added. MPmain is a constructor for each subprocess so I've addede the method MPmain.new_scheduler() as a constructor of the MPScheduler so it is packaged with all necessary the multiprocessing queues for message exchange between the processes.

To give this some context, I am setting up a simple and generic demonstration, where mpmain controls two schedulers which each have two agents (4 agents in total).

We add the agents to either scheduler (=cluster/partition) and ask the first agent to obtain a signature from all other agents in the system. The agent which sees that all other agents have signed the message will abort further message exchange by sending a contract to mpmain which stops the simulation.

Here is the script:

def test_multiprocessing():
    """ Demonstrates the multiprocessing where mpmain has two schedulers
    which each have two agents (4 agents in total).

    Each agent decides at random who to send a message to. 
    The demonstration stops when all messages have been signed by all agents.
    """
    with MPmain() as main:

        a1 = Agent(1,[2], everyone={1,2,3,4})
        a2 = Agent(2,[3], everyone={1,2,3,4})
        a3 = Agent(3,[4], everyone={1,2,3,4})
        a4 = Agent(4,[1], everyone={1,2,3,4})

        a1.inbox.append(ChainMsg(s=1, r=1))

        s1 = main.new_scheduler()
        s1.add(a1)
        s1.add(a2)

        s2 = main.new_scheduler()
        s2.add(a3)
        s2.add(a4)

        main.run()

if __name__ == "__main__":
    test_multiprocessing()

And this is the output:

image

We see how:

  1. The agents are added.
  2. The 2 schedulers are started.
  3. How the first agent discovers the message and starts sending it to its peers.
  4. That agent 4 detects that all have signed the message and creates the contract.
  5. That the mpmain receives the contract and starts to shut the simulation down.
  6. That the 2 scheduler receive the stop signal
  7. And that mpmain acknowledges that the schedulers have stopped correctly.

When you run the code, you will notice that it takes about 1/2 a second to start and to stop the subprocesses. This is because python needs to copy the loaded variables from the main process to the subprocesses. For simulations with a large number of agents it could hence be more efficient to add a start-up process for each scheduler that loads what it needs to avoid the overhead of copying objects around. In this snippet however I'd rather leave it out, so and focus on the messaging.

At this point I'd like you to scroll to the bottom of this ticket and copy the code into a notepad so you have it next to this text.

In the code you will find two messages (ChainMsg and Contract) and a signal (Stop). These are trivial.

You will also find a new class Link which is a container for the interprocess communication between. There are to attributes:

    self.to_mp = ctx.Queue()
    self.to_proc = ctx.Queue()

These two queues are multiprocessing.Queues which enable the parent (main) process to send messages to the child process (to_proc) and back (to_mp) and hence is the only object that is shared.

Sharing any other type of object between processes will lead to synchronisation issues as the one process will hold python's GIL and lock the object. The good news, however is that you can send any python object from one process to the other using queue.put(obj) on the sender side and retrieve it on the other side using obj = queue.get_nowait().

The class MPmain has two methods that you may have come across before:

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):  # signature requires these, though I don't use them.
        self._stop()

These methods enable us to use the class as a context manager, so that it works with the with statement, just like file handles where the file is closed when the with-block is left:

with myfile as fi:  # with-block.
    fi.read()  # inside with-block
# outside with-block

We need the context manager style to assure that the subprocesses shut down properly with we leave the context as a context managers __exit__ method always is called when an exception is raised.

For convenience, MPmain has the familiar api run to mirror the current schedulers top-level behaviour.

This brings us to the individual schedulers that host each cluster: It is very similar to a typical maslite scheduler with some additional features for interprocess communication:

The toplevel runner has 4 steps:

    def run(self):
        while not self._quit:
            self.process_inter_proc_mail()  # [1]
            self.process_mail_queue()  # [2]
            self.update_agents()  # [3]

This run method is called automatically by mpmain when the simulation starts, so you need to do nothing.

Next is the method process_inter_proc_mail.

    def process_inter_proc_mail(self):
        while not self._quit:
            try:
                msg = self.mq_to_self.get_nowait()
                if isinstance(msg, ChainMsg):
                    self.mail_queue.append(msg)
                elif isinstance(msg, Stop):
                    print(f"{self.name} received stop signal")
                    self._quit = True
                    self.exit.set()
                    break
                else:
                    raise Exception(f"{msg}")
            except queue.Empty:
                return

This process looks at the message queue to self (self.mq_to_self) and tries to retrieve any pending messages and add them to it's main mail_queue.

Next, the run-loop processes the messages, just like the maslite scheduler.

    def process_mail_queue(self):
        for msg in self.mail_queue[:]:
            if msg.r in self.agents:  # if receiver is known ...
                agent = self.agents[msg.r]
                agent.inbox.append(msg)
            else:  # give it to main ...
                self.mq_to_main.put(msg)  #  Line 218
        self.mail_queue.clear()

This code should appear very familiar. The only new thing is the if-else clause where the scheduler concludes that if msg.receiver is not in self.agents then it must be a message for someone outside its cluster, and it puts the message onto the queue to main.

Finally, as the messages have been sorted to each of the agents, the agents are updated with the familiar method update_agents:

    def update_agents(self):
        for agent in self.agents.values():
            if agent.inbox:
                agent.update()

As you can see the system is very simple as there is no cognitive overhead for the developer. The only difference from the classical maslite.scheduler usage is that the developer needs to decide in which cluster to put the agent.

This brings me to the final note: Inter-process communication is a bit slower than within-process-communication, so if the agents communicate a lot outside their cluster, the system will seem slower (!) than running it conventionally. This is expected, and developer just needs to know this.

Here is the whole script:

#mp_test.py
import queue
import multiprocessing
from multiprocessing.context import BaseContext
from itertools import count
from random import choice
from time import process_time, sleep
import platform
default_context = "spawn" if platform.system() != 'Linux' else "fork"

class Stop:
    """ a simple stop signal."""

class ChainMsg:
    """ A standard message 
    we send this message around for multiple agents to sign.
    Once all the agents have signed it, a contract message
    is created and sent to the main process.
    """
    ids = count(start=1)
    def __init__(self,s,r) -> None:
        self.id = next(ChainMsg.ids)
        self.s = s
        self.r = r
        self.signature = []
    def __str__(self) -> str:
        return f"Msg:({self.id} - {self.s}:{self.r}): {self.signature}"

class Contract:
    def __init__(self, s, signatures) -> None:
        self.s = s
        self.r = "nobody"
        self.signatures = signatures
    def __str__(self) -> str:
        return f"{self.s} received signatures from eveyone."

class Link:
    """ scaffolding to keep queues and subprocess together """
    def __init__(self,name, ctx) -> None:
        self.to_mp = ctx.Queue()
        self.to_proc = ctx.Queue()
        self.scheduler = MPScheduler(ctx=ctx, mq_to_main=self.to_mp, mq_to_self=self.to_proc, name=name)
    def start(self):
        self.scheduler.start()
    def is_alive(self):
        return self.scheduler.is_alive()
    @property
    def exitcode(self):
        return self.scheduler.exitcode

class MPmain:
    """ The main process for inter-process message exchange """
    def __init__(self, context=default_context) -> None:
        self._ctx = multiprocessing.get_context(context)

        self.links = []
        self.schedulers = {}

        self.agents = {}  # agent.id: proc.id
        self.finished = []
        self._quit = False

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):  # signature requires these, though I don't use them.
        self._stop()

    def new_scheduler(self):
        name = str(len(self.schedulers)+1)
        link = Link(name, ctx=self._ctx)  # communication link between mpmain and a scheduler.
        self.schedulers[name] = link
        return link.scheduler

    def _start(self):
        procs = []
        for name, link in self.schedulers.items():
            for id in link.scheduler.agents.keys():
                self.agents[id] = name 

            link.start()
            print(f"{name} starting")
            procs.append(link)

        while not all(p.is_alive() is True for p in procs):
            sleep(0.01)  # wait for the OS to launch the procs.
        print(f"all {len(self.schedulers)} started")

    def _stop(self):
        procs = []
        for link in self.schedulers.values():
            link.to_proc.put(Stop())  # send stop signal.
            procs.append(link)
            print(f"{link.scheduler.name} stopping")
        while any(p.is_alive() for p in procs):
            sleep(0.01)  # wait until all subprocesses have stopped.

        # multiprocessing can't shut down until all message queues are 
        # empty, so we need to purge the system.
        for link in self.schedulers.values():
            assert isinstance(link, Link)
            while not link.to_mp.empty:
                _ = link.to_mp.get_nowait()
            while not link.to_proc.empty:
                _ = link.to_proc.get_nowait()
        print(f"all {len(self.schedulers)} schedulers stopped")

    def run(self, timeout=3):
        self._start()
        try:
            while not self._quit:
                if process_time() > timeout:
                    return
                self.process_mail_queue()
        except KeyboardInterrupt:
            pass

    def process_mail_queue(self):
        for name, link in self.schedulers.items():
            for _ in range(link.to_mp.qsize()):
                try:
                    msg = link.to_mp.get_nowait()
                    print(msg)
                    if isinstance(msg, Contract):
                        self.finished.append(msg.s)
                        if set(self.finished) == set(self.agents):
                            self._quit = True
                    elif isinstance(msg, ChainMsg):
                        link_name = self.agents[msg.r]
                        link = self.schedulers[link_name]
                        link.to_proc.put(msg)
                    else:
                        raise Exception(f"unhandled message type: {msg}")
                except queue.Empty:
                    if link.is_alive():
                        break  # break and move to next link.
                    elif link.exitcode == -9:
                        raise ChildProcessError(f"{name}:Out of memory")
                    elif link.exitcode != 0:
                        raise ChildProcessError(f"{name}: {link.exitcode}")
                    else:
                        raise Exception

class Agent:
    def __init__(self, id, peers, everyone) -> None:
        self.id = id
        if id in peers:
            raise ValueError("that's silly... ")
        self.peers = peers
        self.everyone = set(everyone)
        self.inbox = []
        self.scheduler = None

    def update(self):
        for msg in self.inbox:
            print(msg)
            self.new_message(msg)
        self.inbox.clear()

    def new_message(self, msg):
        msg.signature.append(self.id)
        if set(msg.signature) == self.everyone:
            # the message has been signed by everyone. 
            # Now make the contract.
            self.send(Contract(s=self.id, signatures=self.everyone))
        else:
            msg.s = self.id
            msg.r = choice(self.peers)
            self.send(msg)

    def send(self, msg):
        self.scheduler.mail_queue.append(msg)

class MPScheduler:
    def __init__(self, ctx:BaseContext, 
                 mq_to_main:multiprocessing.Queue,
                 mq_to_self:multiprocessing.Queue, 
                 name: str) -> None:
        self.ctx = ctx
        self.exit = ctx.Event()
        self.mq_to_main = mq_to_main
        self.mq_to_self = mq_to_self
        self.process = ctx.Process(group=None, target=self.run, name=name, daemon=False)
        self.name = name
        self._quit = False

        self.agents = {}
        self.mail_queue = []

    def start(self):
        print("starting")
        self.process.start()

    def is_alive(self):
        return self.process.is_alive()

    @property
    def exitcode(self):
        return self.process.exitcode

    def add(self, agent):
        print(f"adding agent {agent.id}")
        agent.scheduler = self
        self.agents[agent.id] = agent

    def update_agents(self):
        for agent in self.agents.values():
            if agent.inbox:
                agent.update()

    def process_mail_queue(self):
        for msg in self.mail_queue[:]:
            if msg.r in self.agents:  # if receiver is known ...
                agent = self.agents[msg.r]
                agent.inbox.append(msg)
            else:  # give it to main ...
                self.mq_to_main.put(msg)
        self.mail_queue.clear()

    def process_inter_proc_mail(self):
        while not self._quit:
            try:
                msg = self.mq_to_self.get_nowait()
                if isinstance(msg, ChainMsg):
                    self.mail_queue.append(msg)
                elif isinstance(msg, Stop):
                    print(f"{self.name} received stop signal")
                    self._quit = True
                    self.exit.set()
                    break
                else:
                    raise Exception(f"{msg}")
            except queue.Empty:
                return

    def run(self):
        while not self._quit:
            self.process_inter_proc_mail()
            self.process_mail_queue()
            self.update_agents()

def test_multiprocessing():
    """ Demonstrates the multiprocessing where mpmain has two schedulers
    which each have two agents (4 agents in total).

    Each agent decides at random who to send a message to. 
    The demonstration stops when all messages have been signed by all agents.
    """
    with MPmain() as main:

        a1 = Agent(1,[2], everyone={1,2,3,4})
        a2 = Agent(2,[3], everyone={1,2,3,4})
        a3 = Agent(3,[4], everyone={1,2,3,4})
        a4 = Agent(4,[1], everyone={1,2,3,4})

        a1.inbox.append(ChainMsg(s=1, r=1))

        s1 = main.new_scheduler()
        s1.add(a1)
        s1.add(a2)

        s2 = main.new_scheduler()
        s2.add(a3)
        s2.add(a4)

        main.run()

if __name__ == "__main__":
    test_multiprocessing()
    # output:
    # -------------------------------
    # adding agent 1
    # adding agent 2
    # adding agent 3
    # adding agent 4
    # starting
    # 1 starting
    # starting
    # 2 starting
    # all 2 started
    # Msg:(1 - 1:1): []
    # Msg:(1 - 1:2): [1]
    # Msg:(1 - 2:3): [1, 2]
    # Msg:(1 - 2:3): [1, 2]
    # Msg:(1 - 3:4): [1, 2, 3]
    # 4 received signatures from eveyone.
    # 1 stopping
    # 2 stopping
    # 1 received stop signal
    # 2 received stop signal
    # all 2 schedulers stopped
root-11 commented 6 months ago

In branch mp I've run the latest example of test_multi_processing_advanced.py with pypy310 with pytest installed in the virtual environment (see pypy for details)

The test is here: https://github.com/root-11/maslite/blob/mp/tests/test_multi_processing_advanced.py

This test is written in "old style" so that the clock doesn't progress whilst the agents are negotiating. However, when the agents are idle the clock jumps to the next alarm.

At the bottom of the test, you will find stdtout with the line 513:

# wall time: 0.047859992999292444 / sim time: 4.75 = 99.25X