quantmind / pulsar

Event driven concurrent framework for Python
BSD 3-Clause "New" or "Revised" License
1.87k stars 162 forks source link

Cant get concurrency #242

Closed robgil closed 8 years ago

robgil commented 8 years ago

Hey there

I'm trying to learn how coroutines and actors work within pulsar. I'm writing a simple site scraper that pulls from a queue. I've been going over all the examples, and am very confused as to how any of the examples are concurrent with a single actor defined in them.

This example has a single actor. https://github.com/quantmind/pulsar/blob/master/examples/snippets/actor1.py

What confuses me here is that there is a single actor that looks like it runs two functions but I don't understand how this can run both at the same time on a single actor.

This example also looks like a single actor. https://github.com/quantmind/pulsar/blob/master/examples/snippets/greeter.py

In my code below, im trying to spawn up 10 actors which should be different threads/processes. I'm hoping to distribute messages across actors and run them concurrently. When I tun this code, I see the entire thing slow down to just one req every 3s as per the sleep in scrape(). I'm confused as to why this isn't returning a future that I can check the status of later.

def start(arbiter, **kw):
    ensure_future(app(arbiter))

async def app(arbiter):
    # Spawn new actors
    try:
        actors = []
        for i in range(0,9):
            actors.append(await spawn(name='worker'+str(i)))
    except Exception as e:
        logger.exception(e)

    print(actors)

    try:
        while True:
            try:
                worker = actors[random.choice(range(0,9))]
            except Exception as e:
                logger.exception(e)
            # Run scrape job
            result = await send(worker, 'run', scrape, {'sku': 'SKU1'})

    except KeyboardInterrupt as e:
        # Stop the event loop
        arbiter.stop()
        logger.exception(e)

async def scrape(actor, message):
    sku = message['sku']
    actor.logger.info("Scraping SKU: " + sku)
    time.sleep(3)

    # Do the scrape

Any help would be appreciated. Rob

lsbardel commented 8 years ago

The actor model in pulsar refers to the parallel side of the asynchronous framework. In pulsar each actor (think of a specialised thread or process) has its own event loop. In this way any actor can run its own asynchronous server for example.

An actor has its own event loop and therefore it can run several tasks concurrently. Indeed this is how asyncio works, not just pulsar. If these tasks (coroutines) are IO friendly, a pulsar app can be very fast.

Throwing lots of actors to your problem is not the correct solution. By default an actor is a process and there are so many processes you can use (usually 4 x number of cores). Adding more actors is your last solution, when you need to scale horizontally on a given machine.

First try to get your async app running on one actor

Depending on what the scrape function does, you can organise your application in different ways:

In addition:

async def scrape(actor, message):
    sku = message['sku']
    actor.logger.info("Scraping SKU: " + sku)
    time.sleep(3)

Is not a coroutine. This function blocks in time.sleep(3). A better example is to use

await asyncio.sleep(3)

The line

result = await send(worker, 'run', scrape, {'sku': 'SKU1'})

waits for results to be ready. Your workers are not doing any concurrent work, instead they work one at the time.

lsbardel commented 8 years ago

Closing this ticket as it is not a bug nor a feature request. For general discussion/help/advise the pulsar mailing list is a more suitable location.