omnilib / aiomultiprocess

Take a modern Python codebase to the next level of performance.
https://aiomultiprocess.omnilib.dev
MIT License
1.78k stars 101 forks source link

[Error] queue.Full on macOS with >32767 items. #20

Open HyWell opened 5 years ago

HyWell commented 5 years ago

Description

its my code

import aiomultiprocess
import aiohttp
import multiprocessing
import logging

async def run(url):
    async with asyncio.Semaphore(500):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
            async with session.get(url, timeout=20) as resp:
                result = await resp.text()
                if 'test' in result:
                    logging.warning(url)
                else:
                    logging.error(url)

async def main():
    url = 'http://www.baidu.com/index.shtml?id='
    urls = [url + str(i) for i in range(1000000)]
    async with aiomultiprocess.Pool(10)as pool:
        await pool.map(run, urls)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Details

error log

Traceback (most recent call last):
  File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
    await pool.map(run, urls)
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in map
    tids = [self.queue_work(func, (item,), {}) for item in iterable]
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in <listcomp>
    tids = [self.queue_work(func, (item,), {}) for item in iterable]
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 367, in queue_work
    self.tx_queue.put_nowait((task_id, func, args, kwargs))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
    main()
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/hywell/Desktop/getr/get-res.py", line 30, in <module>
    loop.run_until_complete(main())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
    await pool.map(run, urls)
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 323, in __aexit__
    self.terminate()
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 437, in terminate
    self.close()
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 432, in close
    self.tx_queue.put_nowait(None)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full
amyreese commented 5 years ago

This is a limitation of mulitprocessing on macOS. On Linux, the queue is unbounded, but macOS has a limit of 2**15 - 1 items. You can work around this by limiting your own batches to less than 32,767 items, or by running this on Linux instead.

Alternately, if anyone is interested in working around this in aiomultiprocess, this will require using a second (process local) queue, and having another asyncio task to add tasks to the multiprocessing queue from the local queue as space becomes available.

GodsDusk commented 2 years ago

same error on M1