kquick / Thespian

Python Actor concurrency library
MIT License
189 stars 24 forks source link

Return future from ask? #60

Closed jaksiprejak closed 3 years ago

jaksiprejak commented 4 years ago

Hello,

From other frameworks, I am used to ask being a non-blocking call that returns a future.

I wish to ask a pool of actors to do some work. Then once they have all been asked, wait for the results.

Something like:

from thespian.actors import Actor, ActorSystem, ActorExitRequest
import time
import itertools
import sys

class EchoActor(Actor):

    def receiveMessage(self, message, sender):
        if message is not ActorExitRequest:
            time.sleep(1)
        print(f'Received {message} from {sender}')
        sys.stdout.flush()
        self.send(sender, f'{self} got sent {message}')

if __name__ == '__main__':

    acs = ActorSystem('multiprocQueueBase')

    a = [acs.createActor(EchoActor) for _ in range(5)]
    actors = itertools.cycle(a)

    t = time.time()

    results = [acs.ask(next(actors), f'foo {i}',) for i in range(10)]

    ## i'd expect then to be able to do something like
    # results = [result.get() for result in results]

    print(results)

    print(f'{time.time() - t} seconds')

    for i in a:
        acs.ask(i, ActorExitRequest)

However, in this case the ask call is blocking which is not what I need.

I don't see any mention of futures in these docs. Is this an option I haven't enabled somewhere, or is there another way to achieve what i'm trying to do with this framework?

Thanks for your time.

kquick commented 4 years ago

You didn't miss anything, there are no futures defined in Thespian. Futures would require local scheduling decisions (like coroutines/generators) that I was hesitant to add to the scope of the Thespian API.

The rough equivalent in Thespian is to use ActorSystem.tell() to send the messages for them to be processed in parallel, and then wait for the responses via ActorSystem.listen().

...
t = time.time()
discard = [acs.tell(next(actors), f'foo {i}',) for i in range(10)]
results = [acs.listen() for i in range(10)]
print(results)
...

Since you are waiting for all responses without doing interim work, the listen() calls can each block and you will still get full parallelism from the actors so it won't affect the overall time to completion, but if you did need to do interim work, you could use the timeout argument to listen() and aggregate the results yourself until they were all non-None.