Closed rkempter closed 6 years ago
Hi @rkempter ,
In the MultiprocTCPBase, each actor is single-threaded, and all self.send()
operations are handled asynchronously by the Actor framework that encapsulates the call to receiveMessage()
. You can issue as many self.send()
calls as you like in the invocation of receiveMessage()
, and these will be queued internally by this framework (and in some cases, the start of the async IO initiated) but until you exit from the receiveMessage()
the asynchronous framework cannot continue to process the send. However, once you do exit, all sends will be processed: sends to different target Actors will be processed in parallel, sends to the same target Actor will follow one another and can be re-ordered.
The messaging capability is 1:n as you are desiring, it's just that the processing of messages needs the cpu context being consumed by the receiveMessage()
itself. As another example of this pattern: https://github.com/kquick/Thespian/blob/master/examples/multi_system/act1/app.py#L25-L32
If this is interfering with your Actor's ability to operate, I'd like to help explore how your Actor's can be adjusted: it's generally an anti-pattern for the Actor Model for one Actor to make any assumptions about the scheduling of delivery to and processing of a message by another Actor.
Hopefully the above information helps clarify the situation, but I'd be happy to expand on any of this if you'd like. Is this creating a particular issue for you or is it simply an observation that the above explanation satisfactorily answers for you?
Regards, Kevin
Thanks for the quick answer, makes sense.
The actor implementation works fine, but due to the queuing of the messages before the exit from receiveMessage
, the degree of parallelism the system is running in is reduced. Some refactoring should resolve the issue.
Thanks again, Renato
Hi @kquick.
I ran into similar issues, I was not aware that actors could not send information from within a receiveMsg
function. In my case I want to have an Actor representing a Motor with a Child Actor being a Status LED (whether this is a useful example or not, is not relevant :P). The Motor actor should notify its status to the LED (toggle).
However, all self.send()
methods are happening after the Motor is finished processing its command. (see Code and Log output below).
Now I found this issue, saying it's by design? How would I work around this? Is there no way to give any feedback to other actors during this? (On a different note: Is there a callback when an actor has been created successfully?)
from thespian.actors import *
import logging
from time import sleep
class LED(ActorTypeDispatcher):
def __init__(self, *args, **kw):
super(LED, self).__init__(*args, **kw)
logging.info("LED created")
def receiveUnrecognizedMessage(self, message, sender):
logging.warning("Handler got unknown message from %s: %s (%s)", sender, message, type(message))
class Motor(ActorTypeDispatcher):
def __init__(self, *args, **kw):
super(Motor, self).__init__(*args, **kw)
logging.info("Motor created")
def receiveUnrecognizedMessage(self, message, sender):
logging.warning("Handler got unknown message from %s: %s (%s)", sender, message, type(message))
class Stepper(Motor):
statusLED = None
def receiveMsg_str(self,message,sender):
if self.statusLED is None:
self.statusLED = self.createActor("loop_test.StatusLED")
if message in "rotate":
for i in range(5):
print("rotate "+str(i))
sleep(1)
self.send(self.statusLED,"on")
self.send(self.statusLED,"off")
self.send(sender,"finished")
class StatusLED(LED):
status = 0
def receiveMsg_str(self,message,sender):
logging.info("Status: "+str(message))
if message in "on":
self.status = 1
else:
self.status = 0
def endprogram(asys):
asys.shutdown()
if __name__ == "__main__":
import sys
asys = ActorSystem('multiprocUDPBase',dict([('Admin Port', 1900),
('Convention Address.IPv4', ('', 1900)),
]))
motor = asys.createActor("loop_test.Stepper")
try:
asys.ask(motor, "rotate")
except KeyboardInterrupt:
print ('keyboard interrupt detected')
endprogram(asys)
endprogram(asys)
sys.exit(0)
Output:
INFO:root:Motor created
rotate 0
INFO:root:LED created
rotate 1
rotate 2
rotate 3
rotate 4
ON
INFO:root:Status: on
ON
INFO:root:Status: on
ON
INFO:root:Status: on
ON
INFO:root:Status: on
ON
WARNING:root:Handler got unknown message from ActorAddr-(UDP|:1900): ActorExitRequest (<class 'thespian.actors.ActorExitRequest'>)
INFO:root:Status: on
OFF
INFO:root:Status: off
Hi @KorbinianK ,
Thank you for providing an example of what you are trying to accomplish: having a specific implementation helps to make the discussion more precise.
As noted in this thread, the send()
operations performed by the Actor are likely to occur asynchronously and will almost certainly be blocked if the Actor's receiveMessage
is blocked performing a syscall (e.g. a sleep or a file read/write, etc.).
In your example, the blocking occurs in the sleep(1)
call, and so all of the LED notifications are queued and do not get fully transmitted to the LED actor until after the inner loop completes. I realize that the sleep()
call is probably a placeholder for some other sort of activity you are performing at that point in your loop, but since sleep()
is a special case with a specific solution, I'll first suggest a solution for an actor that is blocking using sleep()
, and then later provide a suggestion where the blocking is due to some other activity.
Blocking due to Sleep:
To address this, I would change the implementation from performing the entire loop in the context of a single receiveMessage
into a form where Thespian timer messages (https://thespianpy.com/doc/using.html#hH-9cc1acd4-7f35-44ac-94fc-0326dc87a7a5) were used in place of the sleep call:
class Stepper(Motor):
statusLED = None
def receiveMsg_str(self,message,sender):
if self.statusLED is None:
self.statusLED = self.createActor("loop_test.StatusLED")
if message in ["rotate"]:
if self.rotateRequestor:
self.send(sender, 'Stepper currently busy')
# alternatively you could queue pending requests in self.pendingReqs
return
self.rotateRequestor = sender
self.do_rotation(5)
def receiveMsg_WakeupMessage(self, wakemsg, myself):
remcnt = wakemsg.payload
if rem_cnt:
self.do_rotation(rem_cnt)
return
self.send(self.statusLED,"off")
self.send(self.rotateRequestor, "finished")
self.rotateRequestor = None
def do_rotation(self, rem_cnt):
print("rotate "+str(rem_cnt)) # this counts down, not up, but it's irrelevant for this example and you can adjust as needed.
self.wakeupAfter(1, rem_cnt - 1)
self.send(self.statusLED,"on")
Blocking due to other activity:
When you have activity that an Actor must perform in response to a message and that activity will block that Actor for a period of time, the Actor should not be responsible for performing any other operations. In your case, you have a Stepper actor that wants to do a slow and blocking activity (controlling the stepper motor) and also perform LED notifications. By splitting the slow and blocking activity out to a separate Actor that is only responsible for doing that activity you can regain the responsiveness you are looking for. For example:
class StepMotor:
def __init__(self, rem_steps, requestor):
self.rem_steps = rem_steps
self.orig_requestor = requestor
class MotorActuator(ActorTypeDispatcher):
def receiveMsg_StepMotor(self, msg, sender):
print("rotate "+str(i))
sleep(1) # placeholder for the actual work
self.send(sender, msg)
class Stepper(Motor):
statusLED = None
actuator = None
def receiveMsg_str(self,message,sender):
if self.statusLED is None:
self.statusLED = self.createActor("loop_test.StatusLED")
if self.actuator is None:
self.actuator = self.createActor("loop_test.MotorActuator")
if message in ["rotate"]:
self.send(self.actuator, StepMotor(5, sender))
def receiveMsg_StepMotor(self, stepmsg, sender):
stepmsg.rem_steps = stepmsg.rem_steps - 1
if stepmsg.rem_steps:
self.send(self.actuator, stepmsg)
self.send(self.statusLED,"on")
else:
self.send(self.statusLED,"off")
self.send(stepmsg.orig_requestor,"finished")
In this case, the Stepper actor coordinates both the MotorActuator and the StatusLED actors, and the MotorActuator handles the actual slow blocking operation and only that operation.
Also note that there is a third situation: when the blocking is due to reading from a network socket or a file. If your blocking is caused by this, you can use the ThespianWatch extension (https://thespianpy.com/doc/using.html#hH-94edef18-154e-4847-864f-027dff4f6e0a), possibly in conjunction with the WakeupMessage functionality I described above.
Hopefully this is helpful, and please let me know if it is not or you have other questions.
-Kevin
I am currently prototyping an actor system using thespian. Using the TCP Base, I run into issues when generating multiple messages after
receiveMessage
is being called. E.g. Upon reception of a message, an actor should generate a list of items and submit a message for each item (1:N mapping between messages). The receptors of these messages should start immediately, but it seems they do not start running until the receiveMessage has been terminating.Is there a way to support the 1:n message generation?
Thanks