agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.76k stars 134 forks source link

to_process.run_sync() causing high memory usage and exceeding defined process limit #603

Closed AnaelGorfinkel closed 1 year ago

AnaelGorfinkel commented 1 year ago

Things to check first

AnyIO version

3.7.0

Python version

3.10.11

What happened?

I encountered an issue while utilizing to_process.run_sync() in conjunction with a process pool. The problem arises when the process pool loses track of instances, leading to excessive memory consumption.

In my scenario, I needed to execute a CPU-intensive task asynchronously multiple times at unspecified intervals. Recognizing the overhead involved in process creation, I turned to the solution of maintaining a process pool. I implemented your provided code with the following parameters:

  1. The capacity limiter was set to the number of available CPUs (I used 16 CPUs).
  2. The CPU-intensive task was run multiple times (over 100).
  3. Uncertainty of when the CPU-intensive task would be invoked.
  4. The CPU-intensive task utilizes 1GB of memory by default.

While I built my code upon your process pool maintenance solutions, I encountered a noticeable surge in memory consumption. Additionally, I observed that the count of active processes surpassed the limit I had set (exceeding 30 processes).

Assumption Regarding the Root Cause: Upon investigating your code, I've observed that the issue may be related to AnyIO losing the process pointer while identifying idle processes that are meant to be terminated.

Following my investigation, I engaged with your code locally and successfully resolved the problem. The code now functions as intended.

How can we reproduce the bug?

from time import sleep

import anyio
import psutil
from anyio import to_process
from anyio._backends._asyncio import CapacityLimiter
from anyio.to_process import WORKER_MAX_IDLE_TIME

def cpu_intensive_callable() -> None:
    sleep(WORKER_MAX_IDLE_TIME + 5)  # 5 minutes + 5 seconds

class CpuIntensiveClass:
    async def cpu_intensive_method(self, capacity_limiter: CapacityLimiter) -> None:
        print("Running cpu intensive method")
        await to_process.run_sync(cpu_intensive_callable, cancellable=True, limiter=capacity_limiter)

async def main() -> None:
    capacity_limiter = CapacityLimiter(total_tokens=2)  # limit the number of concurrent processes to 2
    cpu_intensive_class = CpuIntensiveClass()
    for _ in range(10):
        await cpu_intensive_class.cpu_intensive_method(capacity_limiter)
        print(f"Running python processes number is {sum(1 for p in psutil.process_iter(['pid', 'name']) if 'python' in str(p.name).lower())}")

if __name__ == "__main__":
    anyio.run(main)

Notice that I used psutil package in order to track the current number of python processes.

agronholm commented 1 year ago

I'm unable to reproduce the problem. What am I supposed to be seeing in the output? The reported number of processes remains the same at every iteration. This is what I got:

My output ``` Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 Running cpu intensive method Running python processes number is 4 ```

Note that your code counts all Python processes, not just the ones spawned by the script.