kquick / Thespian

Python Actor concurrency library
MIT License
189 stars 24 forks source link

ActorSystem.ask() doesn't do anything when called inside an actor in multiprocQueueBase #80

Closed chanind closed 2 years ago

chanind commented 2 years ago

Apologies if I'm missing something obvious, I'm still new to Actors. I have some code that's run inside an actor, where the actor needs to make a request to another actor, then continue processing when the results come back. Running ActorSystem.ask() in the single-threaded mode works fine and does what I want, but when I run this same code in multiprocQueueBase, the actor being asked never receives any messages. How can I use ask() inside of an Actor to wait for a response?

kquick commented 2 years ago

The ask() is an external function on the ActorSystem and is used when non-Actor code wishes to send a request to an Actor and wait for any response (note the word "any" here: the response will be the next message sent by the Actor(s) to that external address and not necessarily the response desired by the external caller). The ask() call is blocking and waits for a message to be received (or the timeout, if specifed).

The Actor itself however should be non-blocking. An Actor is either responding to a message or waiting for a message; while it is responding to a message, it cannot wait for other messages. One of the fundamental aspects of the Actor concurrency paradigm is that when an Actor isn't handling messages it should be available for handling any other messages delivered to that Actor.

If Actor A has received a message and it must send a message to Actor B and receive a response from Actor B before continuing to process that message, Actor A should update its internal state or make other arrangements to continue the processing, send() its message to Actor B, and then return from the receiveHandler; when Actor B sends back the response, Actor A should resume processing the original message using the additional information provided by Actor B. There are typically two ways for Actor A to save the state and resume on receiving the message from Actor B:

  1. save the original message in a list, dictionary, or other container on Actor A's self. For example:

    def receiveMessage(self, message, sender):
    if isinstance(message, InterestingType):
        ... initial processing ...
        if not hasattr(self, 'pending'):
            self.pending = {}
        self.pending[message.id] = (message, sender)
        message_to_b = MessageToB(..., orig_id = message.id)
        self.send(actor_b, message_to_b)
    elif isinstance(message, ResponseFromB):
        (orig_msg, orig_sender) = self.pending[message.orig_id]
        final_response = ... something including orig_msg and the message response from b ...
        self.send(orig_sender, final_response)
  2. Attach the original message to the message sent to Actor B, and ensure that Actor B includes that in the response:

    class ActorA(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, InterestingType):
            ... initial processing ...
            message_to_b = MessageToB(..., orig_msg = message, orig_sender = sender)
            self.send(actor_b, message_to_b)
        elif isinstance(message, MessageToB):
            (orig_msg, orig_sender) = (message.orig_msg, message.orig_sender)`
            final_response = ... something including orig_msg and the message response from b...
            self.send(orig_sender, final_response)

In both cases, B needs to preserve either the orig_id or the orig_sender + orig_msg and add those to it's response to Actor A. The above are simple cases; you can use more complex mechanisms for original message identification and other techniques, but hopefully this helps provide some ideas about how to allow Actor A to request work from Actor B and then operate with B's response.

chanind commented 2 years ago

I see, so you can't block inside an actor. I managed to make something resembling an ask() inside actors by using https://github.com/syrusakbary/promise, where I return a Promise whenever I would use ask() before. Thanks for the in-depth explanation!