omnilib / aiomultiprocess

Take a modern Python codebase to the next level of performance.
https://aiomultiprocess.omnilib.dev
MIT License
1.75k stars 99 forks source link

App is getting stuck, pool never closed with no exception in docker container #185

Open itayB opened 1 year ago

itayB commented 1 year ago

Description

I have a simple module that download avro data files (~50-200 files, each one can be between 1-50MB) from S3 and index the data to Elasticsearch. This module is running in a docker container (within Kubernetes). I was trying to use aiomultiprocess to speed up the process by running it in parallel with more resources (4 cores). I have noticed that the module is getting stuck too often (keep running, doing nothing) and after a long research I found that it's a memory issue (although I didn't get Out Of Memory kill event from Kubernetes). Is there a way to raise an exception in such case? I want to be alerted if my app is getting stuck so I could tune the memory and rerun the tasks again.

Below you can see my effort to reproduce this behavior in a simple task (just a stupid loop to fill memory) instead of downloading files / indexing them to database. Running the code below (and also here) with memory limit of 2g never ends, while changing it to 3g finish successfully.

docker build -t aiomultiprocess_mem .
docker run --rm --memory=2g --cpus=5 --name=mem aiomultiprocess_mem

Output (at some point only heartbeat logs keeps infinitely):

2023-03-19 15:31:20,457 1 INFO               creating pool [app.py:38]
2023-03-19 15:31:20,524 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:20,584 14 INFO               T000 started! [app.py:27]
2023-03-19 15:31:20,589 18 INFO               T003 started! [app.py:27]
2023-03-19 15:31:20,596 22 INFO               T006 started! [app.py:27]
2023-03-19 15:31:20,600 26 INFO               T009 started! [app.py:27]
2023-03-19 15:31:21,525 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:22,525 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:23,527 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,387 18 INFO               T003 100000 [app.py:31]
2023-03-19 15:31:24,386 26 INFO               T009 100000 [app.py:31]
2023-03-19 15:31:24,391 22 INFO               T006 100000 [app.py:31]
2023-03-19 15:31:24,527 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,610 31 INFO               T012 started! [app.py:27]
2023-03-19 15:31:25,528 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:25,695 31 INFO               T012 100000 [app.py:31]
2023-03-19 15:31:26,261 31 INFO               T013 started! [app.py:27]
...
...
2023-03-19 15:33:34,095 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:35,096 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:36,097 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:37,097 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:38,098 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:39,099 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:40,100 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:41,101 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:42,101 1 INFO               heartbeat alive 4 processes [app.py:22]
...

Code:

├── Dockerfile
└── multi
    ├── __init__.py
    ├── __main__.py
    └── app.py

Dockerfile:

FROM python:3.11.2-slim-bullseye
WORKDIR /
RUN pip install --no-cache-dir --upgrade pip==23.0.1 \
 && pip install --no-cache-dir aiomultiprocess==0.9.0
COPY multi /multi
CMD ["python", "-m", "multi"]

__main__.py:

import asyncio

from multi.app import main

if __name__ == "__main__":
    asyncio.run(main())

app.py:

import asyncio
import logging
import os
import sys

from aiomultiprocess import Pool

logger = logging.getLogger(__name__)

def init_logger():
    logging.basicConfig(
        format="%(asctime)-15s %(process)d %(levelname)-18.18s %(message)s [%(filename)s:%(lineno)d]",
        stream=sys.stdout
    )
    logging.root.setLevel(logging.INFO)

async def is_alive(pool):
    while True:
        if pool is not None:
            logger.info(f"heartbeat alive {len(pool.processes.keys())} processes")
        await asyncio.sleep(1)

async def my_mem_task(task_id):
    logger.info(f"T{task_id:03} started!")
    data = []
    for i in range(100_000):
        data.append([i] * 1_000)
    logger.info(f"T{task_id:03} {len(data)}")

async def main():
    init_logger()
    number_of_processes = int(os.getenv("NUMBER_OF_PROCESSES", "4"))
    number_of_async_tasks = int(os.getenv("NUMBER_OF_ASYNC_TASKS", "3"))
    logger.info("creating pool")
    async with Pool(
            processes=number_of_processes,
            childconcurrency=number_of_async_tasks,
            initializer=init_logger,
    ) as pool:
        asyncio.create_task(is_alive(pool))
        task_ids = [task_id for task_id in range(150)]
        await pool.map(my_mem_task, task_ids)
        pool.close()
        logger.info("processes pool closed")
        await pool.join()
        logger.info("all processes are done")

UPDATE: Same happen in non aio version multiprocessing :( (added a relevant question in SO)

Details