django / asgiref

ASGI specification and utilities
https://asgi.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.47k stars 209 forks source link

Issue in sync.py's SyncToAsync class as new ThreadPoolExecutor executors with daemon threads getting created for requests. #458

Open sunilnitdgp opened 6 months ago

sunilnitdgp commented 6 months ago

Hi,

I'm attempting to run an async view using the ASGI protocol with Daphne as the server. However, I've noticed that it's creating new ThreadPoolExecutor instances for some requests, with daemon threads still running in the background post-benchmarking. My understanding is that since it's based on an event loop, it should use a single ThreadPoolExecutor. Could someone clarify this for me?

Middlewares:

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

And my view is:

import threading

import asyncio
import aiohttp
import logging

import psutil as psutil
from rest_framework.response import Response
from adrf.decorators import api_view
import json

logger = logging.getLogger(__name__)

@api_view(['GET'])
async def index(request):
    res = await make_api_request("http://{{host}}/v1/completions")
    return Response(res, status=200)

async def make_api_request(url, method="POST", headers=None, params=None, json_data=None, timeout=None):
    try:
        json_data = {
                'prompt': 'Hi, How are you?',
                'max_new_tokens': 700, 'temperature': 0, 'top_p': 1, 'max_tokens': 700,
                'model': 'meta-llama/Llama-2-7b-chat-hf'}
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, headers=headers, params=params, json=json_data,
                                       timeout=timeout, ssl=False) as response:
                content = await response.read()
                if 'json' in response.headers.get('Content-Type', ''):
                    content = json.loads(content)
                return content
    except asyncio.TimeoutError:
        raise TimeoutError("Request timed out. The server did not respond within the specified timeout period.")
    except aiohttp.ClientError as e:
        raise ConnectionError(f"Request error: {str(e)}")
    except Exception as e:
        raise Exception(f"Exception error: {str(e)}")

The code I'm pointing to in sync.py

    async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
        __traceback_hide__ = True  # noqa: F841
        loop = asyncio.get_running_loop()

        # Work out what thread to run the code in
        if self._thread_sensitive:
            if hasattr(AsyncToSync.executors, "current"):
                # If we have a parent sync thread above somewhere, use that
                executor = AsyncToSync.executors.current
            elif self.thread_sensitive_context.get(None):
                # If we have a way of retrieving the current context, attempt
                # to use a per-context thread pool executor
                thread_sensitive_context = self.thread_sensitive_context.get()

                if thread_sensitive_context in self.context_to_thread_executor:
                    # Re-use thread executor in current context
                    executor = self.context_to_thread_executor[thread_sensitive_context]
                else:
                    # Create new thread executor in current context
                    executor = ThreadPoolExecutor(max_workers=1)
                    # print("================== created new thread ================")
                    self.context_to_thread_executor[thread_sensitive_context] = executor
            elif loop in AsyncToSync.loop_thread_executors:
                # Re-use thread executor for running loop
                executor = AsyncToSync.loop_thread_executors[loop]
            elif self.deadlock_context.get(False):
                raise RuntimeError(
                    "Single thread executor already being used, would deadlock"
                )
            else:
                # Otherwise, we run it in a fixed single thread
                executor = self.single_thread_executor
                self.deadlock_context.set(True)
        else:
            # Use the passed in executor, or the loop's default if it is None
            executor = self._executor

        context = contextvars.copy_context()
        child = functools.partial(self.func, *args, **kwargs)
        func = context.run

        try:
            # Run the code in the right thread
            ret: _R = await loop.run_in_executor(
                executor,
                functools.partial(
                    self.thread_handler,
                    loop,
                    self.get_current_task(),
                    sys.exc_info(),
                    func,
                    child,
                ),
            )

        finally:
            _restore_context(context)
            self.deadlock_context.set(False)

        return ret
andrewgodwin commented 6 months ago

Actually, it's normal for multiple executors to be created. The specific behaviour should be:

sunilnitdgp commented 6 months ago

@andrewgodwin Thanks for your comment. I'm using Django as a framework, and each call is explicitly marked as thread_sensitive=True using the default ASGI handler class. For instance:

await sync_to_async(signals.request_started.send, thread_sensitive=True)(
    sender=self.__class__, scope=scope
)

Considering this, ideally, a single shared global executor should be used. However, despite this setup, the code seems to be entering the else block, where a new thread executor is created in the current context instead of reusing the existing one:

if thread_sensitive_context in self.context_to_thread_executor:
    # Re-use thread executor in current context
    executor = self.context_to_thread_executor[thread_sensitive_context]
else:
    # Create new thread executor in current context
    executor = ThreadPoolExecutor(max_workers=1)
    self.context_to_thread_executor[thread_sensitive_context] = executor
andrewgodwin commented 6 months ago

Since all sorts of things that you can do in your app can affect that (different middleware, server configuration, loading in something that uses gevent, etc.), we can only really debug it if you can produce a reliable way to reproduce it in a tiny sample app, unfortunately.

sunilnitdgp commented 6 months ago

I understand. I'm actually working with a simple app containing just a view. Here are the steps I've followed:

  1. Created a virtual environment and installed necessary packages (django, djangorestframework, adrf, aiohttp, daphne, asyncio).
  2. Added daphne and adrf to INSTALLED_APPS in settings.py.
  3. Created an async view in views.py and added a corresponding path in urls.py.
  4. Run the server with python manage.py runserver (can see the log Starting ASGI/Daphne version 4.1.0 development server at http://127.0.0.1:8000/)
  5. Utilized a benchmarking tool (in this case, ab) to test the API endpoint (ab -c 10 -n 600 -s 800007 -T application/json "http://127.0.0.1:8000/test").
  6. Monitored the total number of threads created during benchmarking using top -o threads -pid <pid>.

I'm working on Mac M1 Air