python-trio / trio

Trio – a friendly Python library for async concurrency and I/O
https://trio.readthedocs.io
Other
6.19k stars 341 forks source link

Limits on Nursery Size #527

Closed Miserlou closed 6 years ago

Miserlou commented 6 years ago

I'd like to use Trio for a simple scraping project. Let's say I have a million URLs, but I only have the resources on my box to handle 15 connections at a time. Is there a way to nursery to hold off on executing the rest of the jobs until the current 15 being executed have finished?

njsmith commented 6 years ago

Nurseries don't have this built in, no. Two ways that come to mind:

The second one unfortunately will use a lot more memory, because tasks in trio are pretty cheap but still do take some memory, more than a simpler string sitting in a list.

If you want to get fancier, it would also be possible to implement a reusable capacity-limited nursery class... We should think about adding that to docs as an example, or something. (I guess the semantic subtlety would be what to do with the queued tasks if there's an unhandled error – normally trio guarantees that after you call start_soon the task will run at least until the first await, even if it's cancelled, which is a useful guarantee because it gives with blocks and such a chance to run and clean up resources that might have been passed into the task. But then, maybe if you're using a special object whose whole point is to avoid allocating too many resources at once, though, then you shouldn't be allocating resources until the task actually starts.)

ric2b commented 6 years ago

I was also wondering about this.

I'm doing a performance benchmark of a web service, I want to measure the rate at which it accepts messages under sustained load.

I tried the second solution with the CapacityLimiter using the "asks" library but I easily run out of file descriptors (sockets, most likely) and the script crashes with errno 24: too many open files unless I set a really low limit like 4. (ulimit -n returns 1024 but even a limit of 8 is enough to cause the issue frequently)

Maybe I'm doing something wrong, my code is more or less this:

asks.init('trio')

async def send_message(limit):    
    async with limit:
        response = await asks.post(URL,  data=DATA)
    print(f'data: {DATA}, response status: {response.status_code}')

async def send_n_messages(n):
    limit = trio.CapacityLimiter(4)

    async with trio.open_nursery() as nursery:
        for i in range(n):
            nursery.start_soon(send_message, limit)       

trio.run(send_n_messages, 35000)

Because I want to make sure that the script isn't a bottleneck, I'd like to have a much larger number of concurrent connections, like a few hundreds.

Using the CapacityLimiter seems like the cleaner solution, so I'd prefer to keep it instead of adding a task queue, any ideas?

njsmith commented 6 years ago

Running out of sockets with a limit of 4 seems weird... Maybe you're keeping the socket open past the end of the CapacityLimiter block? Can you close the response object or something?

Another thing to watch out for is: https://asks.readthedocs.io/en/latest/a-look-at-sessions.html#important-connection-un-limiting But I think that only matters if you're using an explicit Session object, which this code isn't.

ric2b commented 6 years ago

I think you're correct that the sockets are being kept open for longer than they should, but there seems to be no way to explicitly close them. They are probably closed/cleaned up automatically after each send_message terminates but not quickly enough to not cause problems.

I was able to solve it by using an asks Session with a few hundred connections, that way I don't even need to use a CapacityLimiter and I don't run out of file descriptors/sockets, so thanks for the link! :) (I just started experimenting with trio and asks yesterday, but so far I'm absolutely loving it!)

For anyone that ends up here with the same problem, this is what I did:

asks.init('trio')

async def send_message(session):    
    response = await session.post(URL,  data=DATA)
    print(f'data: {DATA}, response status: {response.status_code}')

async def send_n_messages(n):
    session = asks.Session(connections=200)

    async with trio.open_nursery() as nursery:
        for i in range(n):
            nursery.start_soon(send_message, session)       

trio.run(send_n_messages, 35000)
theelous3 commented 6 years ago

This is on asks' end, and a fix will be pushed this evening :)

@njsmith was correct. The base methods each create a Session. Currently the code relies on python to clean up the sockets, but evidently this may not be fast enough. I'll start force closing them. Using the Session is the correct way to go anyway, and doesn't have this issue :D Thanks for the feedback guys.

ric2b commented 6 years ago

That's great! I'm really loving the trio + asks combination, it makes large amounts of concurrent http requests so effortless and readable!

Thanks to everyone contributing to them :) (hopefully I will too, eventually)

bronger commented 10 months ago

The following seems to work, but I would appreciate any confirmation: I use a CapacityLimiter in the task function, and start it in the nursery with nursery.start instead of nursery.start_soon. Moreover, the .started() method is called within the async with my_capacity_limiter: in the task function.

This way, I want to use the capacity limiter for both, limiting the number of concurrent tasks as well as the size of the nursery. Is this effective?