lmmx / range-streams

Streaming range requests in Python
https://range-streams.readthedocs.io/en/latest/
MIT License
7 stars 0 forks source link

Multiprocessing on top of async #38

Open lmmx opened 3 years ago

lmmx commented 3 years ago

The speed of the AsyncFetcher appears to be limited by the speed of the GIL (i.e. a single CPU core is being pinned at 100% when the fetcher makes calls), which could be resolved by this solution (with slight updates for current asyncio usage)

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Expensive computation

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run(main())

I don't really understand this example, and I've just rewritten my code to be non-blocking, so not sure why it's being assumed I will be multiprocessing a synchronous function rather than a top-level sync function...

However ignoring that, I think the principle may still apply (of passing the ProcessPoolExecutor)?

The other matter is that of splitting the work up into separate processes in the pool (which I'll need to do regardless of how the multiprocessing is to be achieved).

I think the "data-parallelism" should be at the file level, i.e. when processing one TSV at a time (not doing multiple), implemented by splitting up the URL list for the one file into n_splits splits (corresponding to the number of processes in the pool, mp.cpu_count I think), and then creating an AsyncFetcher for each of these (100/n_splits)% URL lists.

Resuming would still be fairly trivial (instead of the last seen PNG as in serial, you'd want to use the minimum non-completed row, i.e. the end value of the minimum range in the RangeSet formed from conjoining all of the AsyncFetcher.completed RangeSets for each of the fetchers)

lmmx commented 3 years ago

Hard to tell apart, but I recognise Thomas Grainger among the contributors to aiomultiprocess, also a contributor to httpx and in the Python IRC / general community / stdlib contrib I think

lmmx commented 3 years ago

I've now looked at the source code for aiostream, and getting a better understanding of what the following does under the hood:

https://github.com/lmmx/range-streams/blob/4dad2c991150588927795f71e4591ed2599d2fc5/src/range_streams/async_utils.py#L160-L167

Sketch: 2ED2B499-5C83-4887-975B-1723ED3C9263

lmmx commented 3 years ago

The example code for aiomultiprocess is:

import asyncio
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
    async with request("GET", url) as response:
        return await response.text("utf-8")

async def main(urls: list[str]):
    async with Pool() as pool:
        async for result in pool.map(get, urls):
            ...  # process result

asyncio.run(main())

The map here is aiomultiprocess.Pool.map

    def map(
        self,
        func: Callable[[T], Awaitable[R]],
        iterable: Sequence[T],
        # chunksize: int = None,  # todo: implement chunking maybe
    ) -> PoolResult[R]:
        """Run a coroutine once for each item in the iterable."""
        if not self.running:
            raise RuntimeError("pool is closed")

        tids = [self.queue_work(func, (item,), {}) for item in iterable]
        return PoolResult(self, tids)

Note that there is nothing special about the aiostream.map operator: it is simply a combinatorial helper function. This means that it should be able to be swapped out for a multiprocessing-enhanced map function like aiomultiprocess.Pool.map.

I think then it would work to change the assignment of zs from stream.map to aiomultiprocess.Pool.map but note that the function and the input to that function swap call order in these two functions! To keep it consistent, it'd be possible to specify keyword args in that case

However note that the example given above is for the fetch (they make an async funcdef get to fetch URLs with aiohttp.request). In my code, I use starmap rather than map, because I want to reuse a client that I've zipped alongside the URLs

...anyway, note that the example is for the fetch, and for me the fetch is for starmap but I've started considering replacing the map (which does postprocessing not fetching), i.e. this suggestion would only run the postprocessing on all cores, and the fetching would still be limited to one core. It is obviously desirable (as per the example) to run this on all cores, so I want to replace the starmap (not the map) with aiomultiprocess.starmap:

    def starmap(
        self,
        func: Callable[..., Awaitable[R]],
        iterable: Sequence[Sequence[T]],
        # chunksize: int = None,  # todo: implement chunking maybe
    ) -> PoolResult[R]:
        """Run a coroutine once for each sequence of items in the iterable."""
        if not self.running:
            raise RuntimeError("pool is closed")

        tids = [self.queue_work(func, args, {}) for args in iterable]
        return PoolResult(self, tids)

I think that this means the task_limit=20 moves from the map call to the stream.map call(?) Otherwise it would only be limited by the pool (but the task_limit is for asynchronous-ness not multicore-ness!)

async def fetch_and_process(self, urls: Iterator[str], client): 
    assert isinstance(client, httpx.AsyncClient)  # Not type checked due to Sphinx 
    client.timeout = self.timeout 
    ws = stream.repeat(client)
    xs = stream.zip(ws, stream.iterate(urls))
    async with Pool() as pool:
        ys = pool.starmap(iterable=xs, func=self.fetch)
        zs = stream.map(ys, self.process_stream, ordered=False, task_limit=20)
        return await zs

Note that configuring the Pool is important too: the number of cores should be multiprocessing.cpu_count usually. This is the default when the processes: int | None = None argument to Pool is left as is (docs)

The processes value controls the number of worker processes the pool will create and maintain. With the default value of None, the pool will create enough workers for each CPU core available on the host machine. Any other positive integer value will instruct the pool to create that number of workers instead.

This fails with:

RuntimeError('An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The freeze_support() line can be omitted if the program is not going to be frozen to produce an executable.')

lmmx commented 3 years ago

Given how these don't seem to work well together, perhaps I should just replace the aiostream library for aiomultiprocess entirely...

It doesn't seem to work well with httpx.AsyncClient, since it tries to pickle the client. You see the same error when trying to deepcopy the client, so perhaps it's from that

An alternative approach then would be to clone the client (there is nothing preventing from doing this other than it takes time, but can do it in parallel).

lmmx commented 3 years ago

You can share memory in Python 3.8+, perhaps a client could reside in shared memory for access on all cores... (It has a bug though so maybe not)

lmmx commented 3 years ago

I got it to work as follows:

# Multiple processes

from multiprocessing import cpu_count, freeze_support
from aiomultiprocess import Pool
from aiostream import stream
from itertools import repeat
from more_itertools import divide
import httpx
import time
import asyncio

CPU_COUNT = cpu_count()

l_alphabet = "abcdefghijklmnopqrstuvwxyz"
u_alphabet = l_alphabet.upper()

# Pairs from Aa, Ab, ... to ... Zy, Zz
all_combos = [
    f"{u}{l}{ll}" for u in u_alphabet for l in l_alphabet for ll in l_alphabet
]

assert len(all_combos) == 26 ** 3

# If you divide N items into L parts where N < L, the final (L-N) will be empty
# which would mean creating AsyncFetcher with empty URL lists, causing errors
# Avoid that problem by always using the minimum between CPU count and iterable size:
n_parts = min(CPU_COUNT, len(all_combos))
split_combos_it = divide(n_parts, all_combos)  # split `all_combos` into `n_parts` lists
split_combos_lists = map(list, split_combos_it)

async def sleep(items):
    # async with httpx.AsyncClient() as client:
    client = httpx.AsyncClient()
    await asyncio.sleep(2)
    print(items)
    await client.aclose()

async def multi_sleep():
    async with Pool() as pool:
        t0 = time.time()
        async for result in pool.map(sleep, split_combos_lists):
            await asyncio.sleep(0)  # this executes in the process pool context
    t1 = time.time()
    print(f"{t1-t0}s")

if __name__ == "__main__":
    freeze_support()
    asyncio.run(multi_sleep())
Click to show output

``` ['Aaa', 'Aab', 'Aac', 'Aad', 'Aae', 'Aaf', 'Aag', 'Aah', 'Aai', 'Aaj', 'Aak', 'Aal', 'Aam', 'Aan', 'Aao', 'Aap', 'Aaq', 'Aar', 'Aas', 'Aat', 'Aau', 'Aav', 'Aaw', 'Aax', 'Aay', 'Aaz', 'Aba', 'Abb', 'Abc', 'Abd', 'Abe', 'Abf', 'Abg', 'Abh', 'Abi', 'Abj', 'Abk', 'Abl', 'Abm', 'Abn', 'Abo', 'Abp', 'Abq', 'Abr', 'Abs', 'Abt', 'Abu', 'Abv', 'Abw', 'Abx', 'Aby', 'Abz', 'Aca', 'Acb', 'Acc', 'Acd', 'Ace', 'Acf', 'Acg', 'Ach', 'Aci', 'Acj', 'Ack', 'Acl', 'Acm', 'Acn', 'Aco', 'Acp', 'Acq', 'Acr', 'Acs', 'Act', 'Acu', 'Acv', 'Acw', 'Acx', 'Acy', 'Acz', 'Ada', 'Adb', 'Adc', 'Add', 'Ade', 'Adf', 'Adg', 'Adh', 'Adi', 'Adj', 'Adk', 'Adl', 'Adm', 'Adn', 'Ado', 'Adp', 'Adq', 'Adr', 'Ads', 'Adt', 'Adu', 'Adv', 'Adw', 'Adx', 'Ady', 'Adz', 'Aea', 'Aeb', 'Aec', 'Aed', 'Aee', 'Aef', 'Aeg', 'Aeh', 'Aei', 'Aej', 'Aek', 'Ael', 'Aem', 'Aen', 'Aeo', 'Aep', 'Aeq', 'Aer', 'Aes', 'Aet', 'Aeu', 'Aev', 'Aew', 'Aex', 'Aey', 'Aez', 'Afa', 'Afb', 'Afc', 'Afd', 'Afe', 'Aff', 'Afg', 'Afh', 'Afi', 'Afj', 'Afk', 'Afl', 'Afm', 'Afn', 'Afo', 'Afp', 'Afq', 'Afr', 'Afs', 'Aft', 'Afu', 'Afv', 'Afw', 'Afx', 'Afy', 'Afz', 'Aga', 'Agb', 'Agc', 'Agd', 'Age', 'Agf', 'Agg', 'Agh', 'Agi', 'Agj', 'Agk', 'Agl', 'Agm', 'Agn', 'Ago', 'Agp', 'Agq', 'Agr', 'Ags', 'Agt', 'Agu', 'Agv', 'Agw', 'Agx', 'Agy', 'Agz', 'Aha', 'Ahb', 'Ahc', 'Ahd', 'Ahe', 'Ahf', 'Ahg', 'Ahh', 'Ahi', 'Ahj', 'Ahk', 'Ahl', 'Ahm', 'Ahn', 'Aho', 'Ahp', 'Ahq', 'Ahr', 'Ahs', 'Aht', 'Ahu', 'Ahv', 'Ahw', 'Ahx', 'Ahy', 'Ahz', 'Aia', 'Aib', 'Aic', 'Aid', 'Aie', 'Aif', 'Aig', 'Aih', 'Aii', 'Aij', 'Aik', 'Ail', 'Aim', 'Ain', 'Aio', 'Aip', 'Aiq', 'Air', 'Ais', 'Ait', 'Aiu', 'Aiv', 'Aiw', 'Aix', 'Aiy', 'Aiz', 'Aja', 'Ajb', 'Ajc', 'Ajd', 'Aje', 'Ajf', 'Ajg', 'Ajh', 'Aji', 'Ajj', 'Ajk', 'Ajl', 'Ajm', 'Ajn', 'Ajo', 'Ajp', 'Ajq', 'Ajr', 'Ajs', 'Ajt', 'Aju', 'Ajv', 'Ajw', 'Ajx', 'Ajy', 'Ajz', 'Aka', 'Akb', 'Akc', 'Akd', 'Ake', 'Akf', 'Akg', 'Akh', 'Aki', 'Akj', 'Akk', 'Akl', 'Akm', 'Akn', 'Ako', 'Akp', 'Akq', 'Akr', 'Aks', 'Akt', 'Aku', 'Akv', 'Akw', 'Akx', 'Aky', 'Akz', 'Ala', 'Alb', 'Alc', 'Ald', 'Ale', 'Alf', 'Alg', 'Alh', 'Ali', 'Alj', 'Alk', 'All', 'Alm', 'Aln', 'Alo', 'Alp', 'Alq', 'Alr', 'Als', 'Alt', 'Alu', 'Alv', 'Alw', 'Alx', 'Aly', 'Alz', 'Ama', 'Amb', 'Amc', 'Amd', 'Ame', 'Amf', 'Amg', 'Amh', 'Ami', 'Amj', 'Amk', 'Aml', 'Amm', 'Amn', 'Amo', 'Amp', 'Amq', 'Amr', 'Ams', 'Amt', 'Amu', 'Amv', 'Amw', 'Amx', 'Amy', 'Amz', 'Ana', 'Anb', 'Anc', 'And', 'Ane', 'Anf', 'Ang', 'Anh', 'Ani', 'Anj', 'Ank', 'Anl', 'Anm', 'Ann', 'Ano', 'Anp', 'Anq', 'Anr', 'Ans', 'Ant', 'Anu', 'Anv', 'Anw', 'Anx', 'Any', 'Anz', 'Aoa', 'Aob', 'Aoc', 'Aod', 'Aoe', 'Aof', 'Aog', 'Aoh', 'Aoi', 'Aoj', 'Aok', 'Aol', 'Aom', 'Aon', 'Aoo', 'Aop', 'Aoq', 'Aor', 'Aos', 'Aot', 'Aou', 'Aov', 'Aow', 'Aox', 'Aoy', 'Aoz', 'Apa', 'Apb', 'Apc', 'Apd', 'Ape', 'Apf', 'Apg', 'Aph', 'Api', 'Apj', 'Apk', 'Apl', 'Apm', 'Apn', 'Apo', 'App', 'Apq', 'Apr', 'Aps', 'Apt', 'Apu', 'Apv', 'Apw', 'Apx', 'Apy', 'Apz', 'Aqa', 'Aqb', 'Aqc', 'Aqd', 'Aqe', 'Aqf', 'Aqg', 'Aqh', 'Aqi', 'Aqj', 'Aqk', 'Aql', 'Aqm', 'Aqn', 'Aqo', 'Aqp', 'Aqq', 'Aqr', 'Aqs', 'Aqt', 'Aqu', 'Aqv', 'Aqw', 'Aqx', 'Aqy', 'Aqz', 'Ara', 'Arb', 'Arc', 'Ard', 'Are', 'Arf', 'Arg', 'Arh', 'Ari', 'Arj', 'Ark', 'Arl', 'Arm', 'Arn', 'Aro', 'Arp', 'Arq', 'Arr', 'Ars', 'Art', 'Aru', 'Arv', 'Arw', 'Arx', 'Ary', 'Arz', 'Asa', 'Asb', 'Asc', 'Asd', 'Ase', 'Asf', 'Asg', 'Ash', 'Asi', 'Asj', 'Ask', 'Asl', 'Asm', 'Asn', 'Aso', 'Asp', 'Asq', 'Asr', 'Ass', 'Ast', 'Asu', 'Asv', 'Asw', 'Asx', 'Asy', 'Asz', 'Ata', 'Atb', 'Atc', 'Atd', 'Ate', 'Atf', 'Atg', 'Ath', 'Ati', 'Atj', 'Atk', 'Atl', 'Atm', 'Atn', 'Ato', 'Atp', 'Atq', 'Atr', 'Ats', 'Att', 'Atu', 'Atv', 'Atw', 'Atx', 'Aty', 'Atz', 'Aua', 'Aub', 'Auc', 'Aud', 'Aue', 'Auf', 'Aug', 'Auh', 'Aui', 'Auj', 'Auk', 'Aul', 'Aum', 'Aun', 'Auo', 'Aup', 'Auq', 'Aur', 'Aus', 'Aut', 'Auu', 'Auv', 'Auw', 'Aux', 'Auy', 'Auz', 'Ava', 'Avb', 'Avc', 'Avd', 'Ave', 'Avf', 'Avg', 'Avh', 'Avi', 'Avj', 'Avk', 'Avl', 'Avm', 'Avn', 'Avo', 'Avp', 'Avq', 'Avr', 'Avs', 'Avt', 'Avu', 'Avv', 'Avw', 'Avx', 'Avy', 'Avz', 'Awa', 'Awb', 'Awc', 'Awd', 'Awe', 'Awf', 'Awg', 'Awh', 'Awi', 'Awj', 'Awk', 'Awl', 'Awm', 'Awn', 'Awo', 'Awp', 'Awq', 'Awr', 'Aws', 'Awt', 'Awu', 'Awv', 'Aww', 'Awx', 'Awy', 'Awz', 'Axa', 'Axb', 'Axc', 'Axd', 'Axe', 'Axf', 'Axg', 'Axh', 'Axi', 'Axj', 'Axk', 'Axl', 'Axm', 'Axn', 'Axo', 'Axp', 'Axq', 'Axr', 'Axs', 'Axt', 'Axu', 'Axv', 'Axw', 'Axx', 'Axy', 'Axz', 'Aya', 'Ayb', 'Ayc', 'Ayd', 'Aye', 'Ayf', 'Ayg', 'Ayh', 'Ayi', 'Ayj', 'Ayk', 'Ayl', 'Aym', 'Ayn', 'Ayo', 'Ayp', 'Ayq', 'Ayr', 'Ays', 'Ayt', 'Ayu', 'Ayv', 'Ayw', 'Ayx', 'Ayy', 'Ayz', 'Aza', 'Azb', 'Azc', 'Azd', 'Aze', 'Azf', 'Azg', 'Azh', 'Azi', 'Azj', 'Azk', 'Azl', 'Azm', 'Azn', 'Azo', 'Azp', 'Azq', 'Azr', 'Azs', 'Azt', 'Azu', 'Azv', 'Azw', 'Azx', 'Azy', 'Azz', 'Baa', 'Bab', 'Bac', 'Bad', 'Bae', 'Baf', 'Bag', 'Bah', 'Bai', 'Baj', 'Bak', 'Bal', 'Bam', 'Ban', 'Bao', 'Bap', 'Baq', 'Bar', 'Bas', 'Bat', 'Bau', 'Bav', 'Baw', 'Bax', 'Bay', 'Baz', 'Bba', 'Bbb', 'Bbc', 'Bbd', 'Bbe', 'Bbf', 'Bbg', 'Bbh', 'Bbi', 'Bbj', 'Bbk', 'Bbl', 'Bbm', 'Bbn', 'Bbo', 'Bbp', 'Bbq', 'Bbr', 'Bbs', 'Bbt', 'Bbu', 'Bbv', 'Bbw', 'Bbx', 'Bby', 'Bbz', 'Bca', 'Bcb', 'Bcc', 'Bcd', 'Bce', 'Bcf', 'Bcg', 'Bch', 'Bci', 'Bcj', 'Bck', 'Bcl', 'Bcm', 'Bcn', 'Bco', 'Bcp', 'Bcq', 'Bcr', 'Bcs', 'Bct', 'Bcu', 'Bcv', 'Bcw', 'Bcx', 'Bcy', 'Bcz', 'Bda', 'Bdb', 'Bdc', 'Bdd', 'Bde', 'Bdf', 'Bdg', 'Bdh', 'Bdi', 'Bdj', 'Bdk', 'Bdl', 'Bdm', 'Bdn', 'Bdo', 'Bdp', 'Bdq', 'Bdr', 'Bds', 'Bdt', 'Bdu', 'Bdv', 'Bdw', 'Bdx', 'Bdy', 'Bdz', 'Bea', 'Beb', 'Bec', 'Bed', 'Bee', 'Bef', 'Beg', 'Beh', 'Bei', 'Bej', 'Bek', 'Bel', 'Bem', 'Ben', 'Beo', 'Bep', 'Beq', 'Ber', 'Bes', 'Bet', 'Beu', 'Bev', 'Bew', 'Bex', 'Bey', 'Bez', 'Bfa', 'Bfb', 'Bfc', 'Bfd', 'Bfe', 'Bff', 'Bfg', 'Bfh', 'Bfi', 'Bfj', 'Bfk', 'Bfl', 'Bfm', 'Bfn', 'Bfo', 'Bfp', 'Bfq', 'Bfr', 'Bfs', 'Bft', 'Bfu', 'Bfv', 'Bfw', 'Bfx', 'Bfy', 'Bfz', 'Bga', 'Bgb', 'Bgc', 'Bgd', 'Bge', 'Bgf', 'Bgg', 'Bgh', 'Bgi', 'Bgj', 'Bgk', 'Bgl', 'Bgm', 'Bgn', 'Bgo', 'Bgp', 'Bgq', 'Bgr', 'Bgs', 'Bgt', 'Bgu', 'Bgv', 'Bgw', 'Bgx', 'Bgy', 'Bgz', 'Bha', 'Bhb', 'Bhc', 'Bhd', 'Bhe', 'Bhf', 'Bhg', 'Bhh', 'Bhi', 'Bhj', 'Bhk', 'Bhl', 'Bhm', 'Bhn', 'Bho', 'Bhp', 'Bhq', 'Bhr', 'Bhs', 'Bht', 'Bhu'] ... ['Tnd', 'Tne', 'Tnf', 'Tng', 'Tnh', 'Tni', 'Tnj', 'Tnk', 'Tnl', 'Tnm', 'Tnn', 'Tno', 'Tnp', 'Tnq', 'Tnr', 'Tns', 'Tnt', 'Tnu', 'Tnv', 'Tnw', 'Tnx', 'Tny', 'Tnz', 'Toa', 'Tob', 'Toc', 'Tod', 'Toe', 'Tof', 'Tog', 'Toh', 'Toi', 'Toj', 'Tok', 'Tol', 'Tom', 'Ton', 'Too', 'Top', 'Toq', 'Tor', 'Tos', 'Tot', 'Tou', 'Tov', 'Tow', 'Tox', 'Toy', 'Toz', 'Tpa', 'Tpb', 'Tpc', 'Tpd', 'Tpe', 'Tpf', 'Tpg', 'Tph', 'Tpi', 'Tpj', 'Tpk', 'Tpl', 'Tpm', 'Tpn', 'Tpo', 'Tpp', 'Tpq', 'Tpr', 'Tps', 'Tpt', 'Tpu', 'Tpv', 'Tpw', 'Tpx', 'Tpy', 'Tpz', 'Tqa', 'Tqb', 'Tqc', 'Tqd', 'Tqe', 'Tqf', 'Tqg', 'Tqh', 'Tqi', 'Tqj', 'Tqk', 'Tql', 'Tqm', 'Tqn', 'Tqo', 'Tqp', 'Tqq', 'Tqr', 'Tqs', 'Tqt', 'Tqu', 'Tqv', 'Tqw', 'Tqx', 'Tqy', 'Tqz', 'Tra', 'Trb', 'Trc', 'Trd', 'Tre', 'Trf', 'Trg', 'Trh', 'Tri', 'Trj', 'Trk', 'Trl', 'Trm', 'Trn', 'Tro', 'Trp', 'Trq', 'Trr', 'Trs', 'Trt', 'Tru', 'Trv', 'Trw', 'Trx', 'Try', 'Trz', 'Tsa', 'Tsb', 'Tsc', 'Tsd', 'Tse', 'Tsf', 'Tsg', 'Tsh', 'Tsi', 'Tsj', 'Tsk', 'Tsl', 'Tsm', 'Tsn', 'Tso', 'Tsp', 'Tsq', 'Tsr', 'Tss', 'Tst', 'Tsu', 'Tsv', 'Tsw', 'Tsx', 'Tsy', 'Tsz', 'Tta', 'Ttb', 'Ttc', 'Ttd', 'Tte', 'Ttf', 'Ttg', 'Tth', 'Tti', 'Ttj', 'Ttk', 'Ttl', 'Ttm', 'Ttn', 'Tto', 'Ttp', 'Ttq', 'Ttr', 'Tts', 'Ttt', 'Ttu', 'Ttv', 'Ttw', 'Ttx', 'Tty', 'Ttz', 'Tua', 'Tub', 'Tuc', 'Tud', 'Tue', 'Tuf', 'Tug', 'Tuh', 'Tui', 'Tuj', 'Tuk', 'Tul', 'Tum', 'Tun', 'Tuo', 'Tup', 'Tuq', 'Tur', 'Tus', 'Tut', 'Tuu', 'Tuv', 'Tuw', 'Tux', 'Tuy', 'Tuz', 'Tva', 'Tvb', 'Tvc', 'Tvd', 'Tve', 'Tvf', 'Tvg', 'Tvh', 'Tvi', 'Tvj', 'Tvk', 'Tvl', 'Tvm', 'Tvn', 'Tvo', 'Tvp', 'Tvq', 'Tvr', 'Tvs', 'Tvt', 'Tvu', 'Tvv', 'Tvw', 'Tvx', 'Tvy', 'Tvz', 'Twa', 'Twb', 'Twc', 'Twd', 'Twe', 'Twf', 'Twg', 'Twh', 'Twi', 'Twj', 'Twk', 'Twl', 'Twm', 'Twn', 'Two', 'Twp', 'Twq', 'Twr', 'Tws', 'Twt', 'Twu', 'Twv', 'Tww', 'Twx', 'Twy', 'Twz', 'Txa', 'Txb', 'Txc', 'Txd', 'Txe', 'Txf', 'Txg', 'Txh', 'Txi', 'Txj', 'Txk', 'Txl', 'Txm', 'Txn', 'Txo', 'Txp', 'Txq', 'Txr', 'Txs', 'Txt', 'Txu', 'Txv', 'Txw', 'Txx', 'Txy', 'Txz', 'Tya', 'Tyb', 'Tyc', 'Tyd', 'Tye', 'Tyf', 'Tyg', 'Tyh', 'Tyi', 'Tyj', 'Tyk', 'Tyl', 'Tym', 'Tyn', 'Tyo', 'Typ', 'Tyq', 'Tyr', 'Tys', 'Tyt', 'Tyu', 'Tyv', 'Tyw', 'Tyx', 'Tyy', 'Tyz', 'Tza', 'Tzb', 'Tzc', 'Tzd', 'Tze', 'Tzf', 'Tzg', 'Tzh', 'Tzi', 'Tzj', 'Tzk', 'Tzl', 'Tzm', 'Tzn', 'Tzo', 'Tzp', 'Tzq', 'Tzr', 'Tzs', 'Tzt', 'Tzu', 'Tzv', 'Tzw', 'Tzx', 'Tzy', 'Tzz', 'Uaa', 'Uab', 'Uac', 'Uad', 'Uae', 'Uaf', 'Uag', 'Uah', 'Uai', 'Uaj', 'Uak', 'Ual', 'Uam', 'Uan', 'Uao', 'Uap', 'Uaq', 'Uar', 'Uas', 'Uat', 'Uau', 'Uav', 'Uaw', 'Uax', 'Uay', 'Uaz', 'Uba', 'Ubb', 'Ubc', 'Ubd', 'Ube', 'Ubf', 'Ubg', 'Ubh', 'Ubi', 'Ubj', 'Ubk', 'Ubl', 'Ubm', 'Ubn', 'Ubo', 'Ubp', 'Ubq', 'Ubr', 'Ubs', 'Ubt', 'Ubu', 'Ubv', 'Ubw', 'Ubx', 'Uby', 'Ubz', 'Uca', 'Ucb', 'Ucc', 'Ucd', 'Uce', 'Ucf', 'Ucg', 'Uch', 'Uci', 'Ucj', 'Uck', 'Ucl', 'Ucm', 'Ucn', 'Uco', 'Ucp', 'Ucq', 'Ucr', 'Ucs', 'Uct', 'Ucu', 'Ucv', 'Ucw', 'Ucx', 'Ucy', 'Ucz', 'Uda', 'Udb', 'Udc', 'Udd', 'Ude', 'Udf', 'Udg', 'Udh', 'Udi', 'Udj', 'Udk', 'Udl', 'Udm', 'Udn', 'Udo', 'Udp', 'Udq', 'Udr', 'Uds', 'Udt', 'Udu', 'Udv', 'Udw', 'Udx', 'Udy', 'Udz', 'Uea', 'Ueb', 'Uec', 'Ued', 'Uee', 'Uef', 'Ueg', 'Ueh', 'Uei', 'Uej', 'Uek', 'Uel', 'Uem', 'Uen', 'Ueo', 'Uep', 'Ueq', 'Uer', 'Ues', 'Uet', 'Ueu', 'Uev', 'Uew', 'Uex', 'Uey', 'Uez', 'Ufa', 'Ufb', 'Ufc', 'Ufd', 'Ufe', 'Uff', 'Ufg', 'Ufh', 'Ufi', 'Ufj', 'Ufk', 'Ufl', 'Ufm', 'Ufn', 'Ufo', 'Ufp', 'Ufq', 'Ufr', 'Ufs', 'Uft', 'Ufu', 'Ufv', 'Ufw', 'Ufx', 'Ufy', 'Ufz', 'Uga', 'Ugb', 'Ugc', 'Ugd', 'Uge', 'Ugf', 'Ugg', 'Ugh', 'Ugi', 'Ugj', 'Ugk', 'Ugl', 'Ugm', 'Ugn', 'Ugo', 'Ugp', 'Ugq', 'Ugr', 'Ugs', 'Ugt', 'Ugu', 'Ugv', 'Ugw', 'Ugx', 'Ugy', 'Ugz', 'Uha', 'Uhb', 'Uhc', 'Uhd', 'Uhe', 'Uhf', 'Uhg', 'Uhh', 'Uhi', 'Uhj', 'Uhk', 'Uhl', 'Uhm', 'Uhn', 'Uho', 'Uhp', 'Uhq', 'Uhr', 'Uhs', 'Uht', 'Uhu', 'Uhv', 'Uhw', 'Uhx', 'Uhy', 'Uhz', 'Uia', 'Uib', 'Uic', 'Uid', 'Uie', 'Uif', 'Uig', 'Uih', 'Uii', 'Uij', 'Uik', 'Uil', 'Uim', 'Uin', 'Uio', 'Uip', 'Uiq', 'Uir', 'Uis', 'Uit', 'Uiu', 'Uiv', 'Uiw', 'Uix', 'Uiy', 'Uiz', 'Uja', 'Ujb', 'Ujc', 'Ujd', 'Uje', 'Ujf', 'Ujg', 'Ujh', 'Uji', 'Ujj', 'Ujk', 'Ujl', 'Ujm', 'Ujn', 'Ujo', 'Ujp', 'Ujq', 'Ujr', 'Ujs', 'Ujt', 'Uju', 'Ujv', 'Ujw', 'Ujx', 'Ujy', 'Ujz', 'Uka', 'Ukb', 'Ukc', 'Ukd', 'Uke', 'Ukf', 'Ukg', 'Ukh', 'Uki', 'Ukj', 'Ukk', 'Ukl', 'Ukm', 'Ukn', 'Uko', 'Ukp', 'Ukq', 'Ukr', 'Uks', 'Ukt', 'Uku', 'Ukv', 'Ukw', 'Ukx', 'Uky', 'Ukz', 'Ula', 'Ulb', 'Ulc', 'Uld', 'Ule', 'Ulf', 'Ulg', 'Ulh', 'Uli', 'Ulj', 'Ulk', 'Ull', 'Ulm', 'Uln', 'Ulo', 'Ulp', 'Ulq', 'Ulr', 'Uls', 'Ult', 'Ulu', 'Ulv', 'Ulw', 'Ulx', 'Uly', 'Ulz', 'Uma', 'Umb', 'Umc', 'Umd', 'Ume', 'Umf', 'Umg', 'Umh', 'Umi', 'Umj', 'Umk', 'Uml', 'Umm', 'Umn', 'Umo', 'Ump', 'Umq', 'Umr', 'Ums', 'Umt', 'Umu', 'Umv', 'Umw', 'Umx', 'Umy', 'Umz', 'Una', 'Unb', 'Unc', 'Und', 'Une', 'Unf', 'Ung', 'Unh', 'Uni', 'Unj', 'Unk', 'Unl', 'Unm', 'Unn', 'Uno', 'Unp', 'Unq', 'Unr', 'Uns', 'Unt', 'Unu', 'Unv', 'Unw', 'Unx', 'Uny', 'Unz', 'Uoa', 'Uob', 'Uoc', 'Uod', 'Uoe', 'Uof', 'Uog', 'Uoh', 'Uoi', 'Uoj', 'Uok', 'Uol', 'Uom', 'Uon', 'Uoo', 'Uop', 'Uoq', 'Uor', 'Uos', 'Uot', 'Uou', 'Uov', 'Uow', 'Uox', 'Uoy', 'Uoz', 'Upa', 'Upb', 'Upc', 'Upd', 'Upe', 'Upf', 'Upg', 'Uph', 'Upi', 'Upj', 'Upk', 'Upl', 'Upm', 'Upn', 'Upo', 'Upp', 'Upq', 'Upr', 'Ups', 'Upt', 'Upu', 'Upv', 'Upw', 'Upx', 'Upy', 'Upz', 'Uqa', 'Uqb', 'Uqc', 'Uqd', 'Uqe', 'Uqf', 'Uqg', 'Uqh', 'Uqi', 'Uqj', 'Uqk', 'Uql', 'Uqm', 'Uqn', 'Uqo', 'Uqp', 'Uqq', 'Uqr', 'Uqs', 'Uqt', 'Uqu', 'Uqv', 'Uqw', 'Uqx', 'Uqy', 'Uqz', 'Ura', 'Urb', 'Urc', 'Urd', 'Ure', 'Urf', 'Urg', 'Urh', 'Uri', 'Urj', 'Urk', 'Url', 'Urm', 'Urn', 'Uro', 'Urp', 'Urq', 'Urr', 'Urs', 'Urt', 'Uru', 'Urv', 'Urw', 'Urx', 'Ury', 'Urz', 'Usa', 'Usb', 'Usc', 'Usd', 'Use', 'Usf', 'Usg', 'Ush', 'Usi', 'Usj', 'Usk', 'Usl', 'Usm', 'Usn', 'Uso', 'Usp', 'Usq', 'Usr', 'Uss', 'Ust', 'Usu', 'Usv', 'Usw', 'Usx', 'Usy', 'Usz', 'Uta', 'Utb', 'Utc', 'Utd', 'Ute', 'Utf', 'Utg', 'Uth', 'Uti', 'Utj', 'Utk', 'Utl', 'Utm', 'Utn', 'Uto', 'Utp', 'Utq', 'Utr', 'Uts', 'Utt', 'Utu', 'Utv', 'Utw', 'Utx', 'Uty', 'Utz', 'Uua', 'Uub', 'Uuc', 'Uud', 'Uue', 'Uuf', 'Uug', 'Uuh', 'Uui', 'Uuj', 'Uuk', 'Uul', 'Uum', 'Uun', 'Uuo', 'Uup', 'Uuq', 'Uur', 'Uus', 'Uut', 'Uuu', 'Uuv', 'Uuw', 'Uux'] 2.252999782562256s ```

This approach requires multiple clients where the previous required only one. I also suspect this will mean they cannot be reused across files (unless provisioned above the level at which the files were looped over!) as the client cannot be passed out of the process pool context (as annotated in the code comment above).

An alternative approach that would make such a problem negligible would be:

lmmx commented 3 years ago

Now with tqdm and logging:

# Multiple processes

from multiprocessing import cpu_count, freeze_support
from aiomultiprocess import Pool
from aiostream import stream
from itertools import repeat
from more_itertools import divide
import httpx
import time
import asyncio
from tqdm.asyncio import tqdm
import logging

def write_log(msg):
    log = logging.getLogger()
    log.setLevel(logging.DEBUG)
    log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    console.setFormatter(log_format)
    log.addHandler(console)
    log.debug(msg)

CPU_COUNT = cpu_count()

l_alphabet = "abcdefghijklmnopqrstuvwxyz"
u_alphabet = l_alphabet.upper()

# Pairs from Aa, Ab, ... to ... Zy, Zz
all_combos = [
    f"{u}{l}{ll}" for u in u_alphabet for l in l_alphabet for ll in l_alphabet
]

assert len(all_combos) == 26 ** 3

# If you divide N items into L parts where N < L, the final (L-N) will be empty
# which would mean creating AsyncFetcher with empty URL lists, causing errors
# Avoid that problem by always using the minimum between CPU count and iterable size:
n_parts = min(CPU_COUNT, len(all_combos))
split_combos_it = divide(n_parts, all_combos)  # split `all_combos` into `n_parts` lists
split_combos_lists = map(list, split_combos_it)

async def sleep(items):
    # async with httpx.AsyncClient() as client:
    client = httpx.AsyncClient()
    await asyncio.sleep(2)
    #print(items)
    await client.aclose()

async def multi_sleep():
    pbar = tqdm(total=n_parts)
    async with Pool() as pool:
        async for result in pool.map(sleep, split_combos_lists):
            pbar.update()
            await asyncio.sleep(0)  # this executes in the process pool context
    pbar.close()

if __name__ == "__main__":
    freeze_support()
    t0 = time.time()
    asyncio.run(multi_sleep())
    t1 = time.time()
    write_log(f"{t1-t0}s")

100%|█████████████████████████████████████████████████████████| 20/20 [00:02<00:00,  8.33it/s]
[2021-08-14 19:32:10,564] [DEBUG] - 2.403613567352295s