kquick / Thespian

Python Actor concurrency library
MIT License
189 stars 24 forks source link

system hangs with 3 troupe actors #43

Closed andatt closed 5 years ago

andatt commented 5 years ago

Hi Kevin,

I have another one for you:

from thespian.troupe import troupe
from thespian.actors import ActorTypeDispatcher, Actor
from thespian.actors import ActorSystem
import logging
import time

def logfile_extraction(log_files):
    """
    Gets data from logfile and returns as string
    :param log_files: string, path to logfile
    :return: string
    """
    consolidated_output = ""
    for logfile in log_files:
        with open(logfile, "r+") as file:
            consolidated_output += file.read()

    return consolidated_output

class ActorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' in logrecord.__dict__

class NotActorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' not in logrecord.__dict__

def log_config(log_file_path_1, log_file_path_2):
    return {
        'version': 1,
        'formatters': {
            'normal': {'format': '%(levelname)-8s %(message)s'},
            'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
        'filters': {'isActorLog': {'()': ActorLogFilter},
                    'notActorLog': {'()': NotActorLogFilter}},
        'handlers': {'h1': {'class': 'logging.FileHandler',
                            'filename': log_file_path_1,
                            'formatter': 'normal',
                            'filters': ['notActorLog'],
                            'level': logging.INFO},
                     'h2': {'class': 'logging.FileHandler',
                            'filename': log_file_path_2,
                            'formatter': 'actor',
                            'filters': ['isActorLog'],
                            'level': logging.INFO}, },
        'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
    }

class PrimaryActor(Actor):

    def receiveMessage(self, msg, sender):

        test_data = [
            [1, 2, 3, 4, 5] * 2,
            [6, 7, 8, 9, 10] * 2,
            [11, 12, 13, 14, 15] * 2,
            [16, 17, 18, 19, 20] * 2,
            [21, 22, 23, 24, 25] * 2,
            [1, 2, 3, 4, 5] * 2,
            [6, 7, 8, 9, 10] * 2,
            [11, 12, 13, 14, 15] * 2,
            [16, 17, 18, 19, 20] * 2,
            [21, 22, 23, 24, 25] * 2,
            [1, 2, 3, 4, 5] * 2
        ]

        if not hasattr(self, "helper"):
            self.helper = self.createActor(
                SecondaryActor
            )

        for data in test_data:

            self.send(
                self.helper,
                data
            )

@troupe(max_count=200, idle_count=1)
class SecondaryActor(ActorTypeDispatcher):
    received_message_count = 0
    def receiveMessage(self, msg, sender):

        if isinstance(msg, list):
            if not hasattr(self, "helper"):
                self.helper = self.createActor(
                    TertiaryActor
                )

            for data in msg:

                self.send(
                    self.helper,
                    data
                )

@troupe(max_count=200, idle_count=1)
class TertiaryActor(ActorTypeDispatcher):
    received_message_count = 0

    def receiveMessage(self, msg, sender):
        qa = self.createActor(
            QuaternaryActor,
            globalName="quaternay"
        )

        self.send(
            qa,
            msg
        )

@troupe(max_count=200, idle_count=1)
class QuaternaryActor(ActorTypeDispatcher):

    def receiveMessage(self, msg, sender):

        if isinstance(msg, int):

            logging.info("Received message number {0}".format(msg))

thespian_system = ActorSystem(
    "multiprocTCPBase",
    {},
    logDefs=log_config("bug_check_1.log", "bug_check_2.log")
)

primary_actor = thespian_system.createActor(PrimaryActor)

quaternary_actor = thespian_system.createActor(
    QuaternaryActor,
    globalName="quaternay"
)

thespian_system.tell(primary_actor, {})

I am expecting to see 110 'Received message' entries in logfile. Instead the actor system hangs, actors stop work but are still there. My logfile output (truncated) is:

INFO     ActorAddr-(T|:39395) => Received message number 1
INFO     ActorAddr-(T|:39395) => Received message number 6
INFO     ActorAddr-(T|:41401) => Received message number 2
INFO     ActorAddr-(T|:40165) => Received message number 9
INFO     ActorAddr-(T|:45173) => Received message number 3
INFO     ActorAddr-(T|:36873) => Received message number 7
INFO     ActorAddr-(T|:45585) => Received message number 21
INFO     ActorAddr-(T|:34161) => Received message number 4
INFO     ActorAddr-(T|:40759) => Received message number 13
INFO     ActorAddr-(T|:46429) => Received message number 8
INFO     ActorAddr-(T|:42633) => Received message number 12
INFO     ActorAddr-(T|:44969) => Received message number 5
INFO     ActorAddr-(T|:42815) => Received message number 10
INFO     ActorAddr-(T|:42815) => Received message number 4
INFO     ActorAddr-(T|:34829) => Received message number 14
INFO     ActorAddr-(T|:34829) => Received message number 9
INFO     ActorAddr-(T|:38961) => Received message number 10
INFO     ActorAddr-(T|:41837) => Received message number 24
INFO     ActorAddr-(T|:41837) => Received message number 4
INFO     ActorAddr-(T|:41837) => Received message number 25
INFO     ActorAddr-(T|:41837) => Received message number 5
INFO     ActorAddr-(T|:41837) => Received message number 5
INFO     ActorAddr-(T|:41837) => Received message number 10
INFO     ActorAddr-(T|:41837) => Received message number 14
INFO     ActorAddr-(T|:41837) => Received message number 15
INFO     ActorAddr-(T|:41837) => Received message number 24
INFO     ActorAddr-(T|:41837) => Received message number 25
INFO     ActorAddr-(T|:38785) => Received message number 15
INFO     ActorAddr-(T|:33405) => Received message number 9
ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
ERROR    ActorAddr-(T|:40795) => Pending Actor create for ActorAddr-(T|:40795) failed (3585): None
ERROR    ActorAddr-(T|:42263) => Pending Actor create for ActorAddr-(T|:42263) failed (3585): None
ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
ERROR    ActorAddr-(T|:35911) => Pending Actor create for ActorAddr-(T|:35911) failed (3585): None
ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
ERROR    ActorAddr-(T|:45909) => Pending Actor create for ActorAddr-(T|:45909) failed (3585): None
ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
ERROR    ActorAddr-(T|:38583) => Pending Actor create for ActorAddr-(T|:38583) failed (3585): None
ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
ERROR    ActorAddr-(T|:46227) => Pending Actor create for ActorAddr-(T|:46227) failed (3585): None
ERROR    ActorAddr-(T|:35119) => Pending Actor create for ActorAddr-(T|:35119) failed (3585): None
ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
ERROR    ActorAddr-(T|:44393) => Pending Actor create for ActorAddr-(T|:44393) failed (3585): None
ERROR    ActorAddr-(T|:44535) => Pending Actor create for ActorAddr-(T|:44535) failed (3585): None
ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
ERROR    ActorAddr-(T|:35245) => Pending Actor create for ActorAddr-(T|:35245) failed (3585): None
ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
ERROR    ActorAddr-(T|:38503) => Pending Actor create for ActorAddr-(T|:38503) failed (3585): None
ERROR    ActorAddr-(T|:35245) => Pending Actor create for ActorAddr-(T|:35245) failed (3585): None
ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
ERROR    ActorAddr-(T|:34611) => Pending Actor create for ActorAddr-(T|:34611) failed (3585): None
ERROR    ActorAddr-(T|:37605) => Pending Actor create for ActorAddr-(T|:37605) failed (3585): None
ERROR    ActorAddr-(T|:34611) => Pending Actor create for ActorAddr-(T|:34611) failed (3585): None

/tmp/thespian.log is:

2019-03-28 20:31:44.250386 p10786 ERR  Socket error sending to ActorAddr-(T|:44599) on <socket.socket fd=29, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('192.168.0.14', 35535)>: [Errno 104] Connection reset by peer / 104: ************* TransportIntent(ActorAddr-(T|:44599)-pending-ExpiresIn_0:04:59.999222-<class 'thespian.actors.ChildActorExited'>-ChildActorExited:ActorAddr-(T|:35535)-quit_0:04:59.999193)
2019-03-28 20:31:53.808630 p10385 ERR  No response to Admin shutdown request; Actor system not completely shutdown

Any ideas what'ts going on?

Thanks

kquick commented 5 years ago

Hi @andatt

It looks like there are a couple of things going on with your code:

The suggestions for causing this to behave as you expect:

Let me know if this helps or if there are still unexpected behaviors, Kevin

andatt commented 5 years ago

Hi Kevin

That makes sense re the actor system shutdown - there was an erroneous uncommented call which I have now removed.

I modified the secondary / tertiary actors as follows:

@troupe(max_count=200, idle_count=1)
class SecondaryActor(ActorTypeDispatcher):
    child_count = 0
    children_finished = 0

    def receiveMessage(self, msg, sender):
        self.troupe_work_in_progress = True
        if isinstance(msg, list):
            if not hasattr(self, "helper"):
                self.helper = self.createActor(
                    TertiaryActor
                )

            for data in msg:
                self.child_count += 1

                self.send(
                    self.helper,
                    {"from": self.myAddress, "data": data}
                )

    def receiveMsg_str(self, msg, sender):
        self.children_finished +=1
        if self.children_finished == self.child_count:
            self.troupe_work_in_progress = False

@troupe(max_count=200, idle_count=1)
class TertiaryActor(ActorTypeDispatcher):
    def receiveMessage(self, msg, sender):

        if isinstance(msg, dict):
            qa = self.createActor(
                QuaternaryActor,
                globalName="quaternay"
            )
            self.send(
                qa,
                msg["data"]
            )
            self.send(msg["from"], "done!")

Running now produces the output in the logfile:

INFO     ActorAddr-(T|:35549) => Received message number 6
INFO     ActorAddr-(T|:44669) => Received message number 11
INFO     ActorAddr-(T|:33267) => Received message number 1
INFO     ActorAddr-(T|:39331) => Received message number 7
INFO     ActorAddr-(T|:38979) => Received message number 12
INFO     ActorAddr-(T|:46669) => Received message number 5
INFO     ActorAddr-(T|:33061) => Received message number 3
INFO     ActorAddr-(T|:38861) => Received message number 13
INFO     ActorAddr-(T|:38275) => Received message number 9
INFO     ActorAddr-(T|:37443) => Received message number 14
INFO     ActorAddr-(T|:38881) => Received message number 16
INFO     ActorAddr-(T|:36561) => Received message number 8
INFO     ActorAddr-(T|:41383) => Received message number 17
INFO     ActorAddr-(T|:38431) => Received message number 10
INFO     ActorAddr-(T|:39275) => Received message number 2
INFO     ActorAddr-(T|:35219) => Received message number 4
INFO     ActorAddr-(T|:33423) => Received message number 1
INFO     ActorAddr-(T|:45961) => Received message number 11
INFO     ActorAddr-(T|:45961) => Received message number 15
INFO     ActorAddr-(T|:45961) => Received message number 5
INFO     ActorAddr-(T|:45961) => Received message number 10
INFO     ActorAddr-(T|:45961) => Received message number 20
INFO     ActorAddr-(T|:40683) => Received message number 25
INFO     ActorAddr-(T|:40683) => Received message number 5
INFO     ActorAddr-(T|:40683) => Received message number 20
INFO     ActorAddr-(T|:40683) => Received message number 25
INFO     ActorAddr-(T|:40683) => Received message number 15
INFO     ActorAddr-(T|:40683) => Received message number 10
INFO     ActorAddr-(T|:40683) => Received message number 5

So it still seems to be not outputting the all the expected messages. I can confirm all the messages reach and are processed by the tertiary actor. But they don't seem to reach the Quaternary actor. Nor do the str messages from tertiary seem to be received by secondary.

What am I missing here?

Thanks

Andrew

kquick commented 5 years ago

Hi @andatt,

The sample code you are posting doesn't match the behavior you describe. I can make fixes to the sample code to make it work like I think you intended, but in the end result of that it works fully as expected, so ultimately I can't be sure exactly where your error is.

Actually, it resulted in too many messages being delivered, which revealed a sequencing bug in the troupe implementation. I've pushed a fix for that bug, but that wasn't the effect you were seeing.

Some of the changes I made to your posted code to compare to your runs:

I created a gist (https://gist.github.com/kquick/38a23b58e16f1505720a4a18fece012f) with the updated version of your code. When I run this (using the latest master for the too-many-messages troupe fix above) I get the expected 110 messages in bug_check_2.log.

andatt commented 5 years ago

Well this is weird. When I copy paste your code and run it I get exactly the same behaviour I was experiencing with my original code i.e. very few messages in bug_check_2.log.

Just in case I changed the troupe decorators to just @troupe() and made the sleep time at the end 20 seconds. Still the same result. I also tried setting env var THESPLOG_THRESHOLD=info. But I am getting no file at /tmp/thespian.log. Thespian version is 3.9.0.

So I am stumped as to whats going on right now. No doubt it's something very silly I am doing somewhere...

andatt commented 5 years ago

Maybe I will see if I can replicate it inside a docker container using an ubuntu 16.04 image or something like that.

andatt commented 5 years ago

ok so having run:

docker run -w /home -v /path/to/test/script:/home ubuntu:16.04 bash -c "apt-get update && apt-get install -y python-pip && pip install thespian && python possible_thesp_bug.py"

I now get the same output as you from inside the container. So the problem appears to be something on my local machine. Local machine is also ubuntu 16:04

andatt commented 5 years ago

found the issue - the version of Thespian installed was 3.9.0 - an old version. Not sure how this happened given the version in my requirements.txt is 3.9.8...Anyway initial problem now solved by upgrading to 3.9.8 - now have the issue of too many messages you noted above. Will there be another release with that fix?

Thanks

kquick commented 5 years ago

Hi @andatt,

I'm glad you found the root issue with the old version! The thespian/troupe.py is updated on master, and that should give the correct number of messages. If that version looks good to you I'll do a release this weekend.

-Kevin

andatt commented 5 years ago

Hi Kevin

Just tested with those new commits and it works good for me! There is another intermittment issue I am experiencing but have not yet established the likely cause. It may not be Thespian related. If it looks like it is though I will create a new issue.

Thanks yet again for all your help!

Andrew

kquick commented 5 years ago

Released in version 3.9.9. https://github.com/kquick/Thespian/releases/tag/thespian-3.9.9