python / cpython

The Python programming language
https://www.python.org
Other
63.4k stars 30.36k forks source link

RAM consumption too high using concurrent.futures (Python 3.7 / 3.6 ) #78349

Open c6b5a1d2-cc47-4d7b-b899-10b9a195d976 opened 6 years ago

c6b5a1d2-cc47-4d7b-b899-10b9a195d976 commented 6 years ago
BPO 34168
Nosy @tim-one, @MojoVampire, @DemGiran

Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

Show more details

GitHub fields: ```python assignee = None closed_at = None created_at = labels = ['3.7', 'library'] title = 'RAM consumption too high using concurrent.futures (Python 3.7 / 3.6 )' updated_at = user = 'https://github.com/DemGiran' ``` bugs.python.org fields: ```python activity = actor = 'josh.r' assignee = 'none' closed = False closed_date = None closer = None components = ['Library (Lib)'] creation = creator = 'DemGiran' dependencies = [] files = [] hgrepos = [] issue_num = 34168 keywords = [] message_count = 5.0 messages = ['322004', '322005', '322031', '322075', '322373'] nosy_count = 3.0 nosy_names = ['tim.peters', 'josh.r', 'DemGiran'] pr_nums = [] priority = 'normal' resolution = None stage = None status = 'open' superseder = None type = None url = 'https://bugs.python.org/issue34168' versions = ['Python 3.6', 'Python 3.7'] ```

c6b5a1d2-cc47-4d7b-b899-10b9a195d976 commented 6 years ago

I have a list of 30 million strings, and I want to run a dns query to all of them. I do not understand how this operation can get memory intensive. I would assume that the threads would exit after the job is done, and there is also a timeout of 1 minute as well ({'dns_request_timeout': 1}).

Here is a sneak peek of the machine's resources while running the script:

enter image description here

My code is as follows:

    # -*- coding: utf-8 -*-
    import dns.resolver
    import concurrent.futures
    from pprint import pprint
    from json import json

    bucket = json.load(open('30_million_strings.json','r'))

    def _dns_query(target, **kwargs):
        global bucket
        resolv = dns.resolver.Resolver()
        resolv.timeout = kwargs['function']['dns_request_timeout']
        try:
            resolv.query(target + '.com', kwargs['function']['query_type'])
            with open('out.txt', 'a') as f:
                f.write(target + '\n')
        except Exception:
            pass

    def run(**kwargs):
        global bucket
        temp_locals = locals()
        pprint({k: v for k, v in temp_locals.items()})

        with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor:
            future_to_element = dict()

            for element in bucket:
                future = executor.submit(kwargs['function']['name'], element, **kwargs)
                future_to_element[future] = element

            for future in concurrent.futures.as_completed(future_to_element):
                result = future_to_element[future]

    run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'},
        concurrency={'threads': 15})
c6b5a1d2-cc47-4d7b-b899-10b9a195d976 commented 6 years ago

It seems that even without the as_completed call it has the same problem.

# -*- coding: utf-8 -*-
import dns.resolver
import concurrent.futures
from pprint import pprint
from json import json

bucket = json.load(open('30_million_strings.json','r'))

def _dns_query(target, **kwargs):
    global bucket
    resolv = dns.resolver.Resolver()
    resolv.timeout = kwargs['function']['dns_request_timeout']
    try:
        resolv.query(target + '.com', kwargs['function']['query_type'])
        with open('out.txt', 'a') as f:
            f.write(target + '\n')
    except Exception:
        pass

def run(**kwargs):
    global bucket
    temp_locals = locals()
    pprint({k: v for k, v in temp_locals.items()})

    with concurrent.futures.ThreadPoolExecutor(max_workers=kwargs['concurrency']['threads']) as executor:
        for element in bucket:
            executor.submit(kwargs['function']['name'], element, **kwargs)

run(function={'name': _dns_query, 'dns_request_timeout': 1, 'query_type': 'MX'},
    concurrency={'threads': 15})
tim-one commented 6 years ago

If your bucket has 30 million items, then

    for element in bucket:
        executor.submit(kwargs['function']['name'], element, **kwargs)

is going to create 30 million Future objects (and all the under-the-covers objects needed to manage their concurrency) just as fast as the main thread can create them. Nothing in your code waits for anything to finish until after they've _all_ been created and queued up under the covers.

So your producer is running vastly faster than your consumers can keep up with. It's the huge backlog of pending work items that consume the RAM. To slash RAM, you need to craft a way to interleave creating new work items with giving consumers time to deal with them.

tim-one commented 6 years ago

Note that you can consume multiple gigabytes of RAM with this simpler program too, and for the same reasons:

import concurrent.futures as cf

bucket = range(30_000_000)

def _dns_query(target):
    from time import sleep
    sleep(0.1)

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        future_to_element = dict()

        for element in bucket:
            future = executor.submit(_dns_query, element)
            future_to_element[future] = element

        for future in cf.as_completed(future_to_element):
            elt = future_to_element[future]
            print(elt)

run()

The usual way to mediate between producers and consumers that run at very different speeds is to use a bounded queue, so that producers block when putting new work items on the queue until consumers make progress taking work items off. That's easy enough to do with multiprocessing instead, but concurrent.futures doesn't support that directly.

If you don't mind doing the work in chunks, this straightforward modification allows slashing RAM - the smaller CHUNK is, the less RAM is needed, but also the more often the code waits for the most recent chunk to finish:

CHUNK = 10_000

...

def chunkify(iterable, chunk=CHUNK):
    from itertools import islice
    it = iter(iterable)
    while True:
        piece = list(islice(it, chunk))
        if piece:
            yield piece
        else:
            return

def run():
    with cf.ThreadPoolExecutor(3) as executor:
        for b in chunkify(bucket):
            # essentially the original code just indented
            future_to_element = dict()
            for element in b:
                future = executor.submit(_dns_query, element)
                future_to_element[future] = element

            for future in cf.as_completed(future_to_element):
                elt = future_to_element[future]
                print(elt)
99ffcaa5-b43b-4e8e-a35e-9c890007b9cd commented 6 years ago

Note: While this particular use case wouldn't be fixed (map returns in order, not as completed), applying the fix from bpo-29842 would make many similar use cases both simpler to implement and more efficient/possible.

That said, no action has been taken on bpo-29842 (no objections, but no action either), so I'm not sure what to do to push it to completion.

gsmethells commented 1 year ago

We've run into this issue as well. ThreadPoolExecutor should have a kwarg to limit the number of outstanding futures (as those futures that are done and have results seem to be what is holding the memory). We've been forced to leverage a variant of this counting semaphore suggestion (also seen here and here) until the stdlib resolves this major issue.