elastic / elasticsearch-py

Official Python client for Elasticsearch
https://ela.st/es-python
Apache License 2.0
4.2k stars 1.18k forks source link

Memory leak when using AsyncElasticsearch #2478

Open teuneboon opened 5 months ago

teuneboon commented 5 months ago

Elasticsearch version (bin/elasticsearch --version): 8.2.0

elasticsearch-py version (elasticsearch.__versionstr__): 8.12.0

Python version: 3.9.2

Description of the problem including expected versus actual behavior: We run an API with an endpoint that does a call to Elasticsearch. In this endpoint we initialize AsyncElasticsearch, run a search query(might be multiple in the future, but just one for now) and close the connection to Elasticsearch. We noticed that if this API endpoint is called a lot, memory used by the process running the API keeps increasing until the process is killed because it goes OOM.

Steps to reproduce: I isolated the issue in a relatively simple script:

import asyncio

from elasticsearch import AsyncElasticsearch

SERVERS = [
    'https://elk001:9200',
    'https://elk002:9200',
    'https://elk003:9200',
]
INDEX = 'logs'
API_KEY = 'xxx'

async def leaky():
    while True:
        es = AsyncElasticsearch(SERVERS, api_key=API_KEY)
        async with es as client:
            await client.search(
                index=INDEX,
                body={
                    'from': 0,
                    'size': 0,
                    'query': {
                        'bool': {
                            'must': [],
                            'filter': [],
                            'should': [],
                            'must_not': [],
                        },
                    },
                },
            )
        print('completed a query')

if __name__ == '__main__':
    asyncio.run(leaky())

If you run this memory usage will quickly(< 1 minute in our setup) increase to about 1GiB and beyond. If you pull the es = AsyncElasticsearch initialization out of the while True loop memory still increases, but much more slowly(although unless I'm missing something, while it might not be best practice it still shouldn't leak that fast when it's inside the loop).

What I didn't test: I didn't have time to fully analyze this with memory profilers. I'm also not sure if it's only search queries that are affected by this or if simple initializing AsyncElasticsearch without running any query already causes the leak to happen(or if any other request leaks). Didn't test whether the api key or SSL has an effect either. I just wanted an isolated testcase to confirm I was still sane. We solved this in the end by just switching back to the sync Elasticsearch client since we're not executing queries in parallel any time soon, but I still thought I'd report it in case others run into this issue.

pquentin commented 5 months ago

Thanks @teuneboon, I can reproduce this! :tada: My observations:

leak

The next steps are using memray to understand the peak usage in more detail and trying to reproduce with aiohttp.

pquentin commented 5 months ago

Here's my current attempt with aiohttp:

import asyncio
import aiohttp

async def leaky():
    i = 0
    while i <= 1500:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                "https://localhost:9200/",
                auth=aiohttp.BasicAuth("elastic", "changeme"),
                ssl=False,
            ) as response:
                assert response.status == 200
                await response.text()
        i += 1
        if i % 100 == 0:
            print(i)

if __name__ == "__main__":
    asyncio.run(leaky())

It inexplicably fails after 1000 connections with:

Traceback (most recent call last):
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/connector.py", line 1173, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/connector.py", line 884, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 899, in getaddrinfo
    return await self.run_in_executor(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/socket.py", line 963, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 16] Device or resource busy

And only partly reproduces the leak:

aiohttp

pquentin commented 5 months ago

I just remembered that the upcoming release later this month will include HTTPX support, so I tried it too.

import asyncio
from elasticsearch import AsyncElasticsearch

async def leaky():
    i = 0
    while i <= 1500:
        async with AsyncElasticsearch(
            "https://localhost:9200",
            basic_auth=("elastic", "changeme"),
            verify_certs=False,
            node_class="httpxasync",
        ) as es:
            await es.info()
        i += 1
        if i % 100 == 0:
            print(i)

if __name__ == "__main__":
    asyncio.run(leaky())

httpx

There's still a leak, maybe? But it's smaller in terms of magnitude and has the same ceiling at some point.