aio-libs / aiobotocore

asyncio support for botocore library using aiohttp
https://aiobotocore.aio-libs.org
Apache License 2.0
1.16k stars 182 forks source link

Aiohttp connection leak #1061

Closed grigoriev-semyon closed 9 months ago

grigoriev-semyon commented 9 months ago

Describe the bug When I use get_object my code starts to run very slowly. Many open aiohttp connections begin to be created. These connections cannot be released. The length of the aiohttp.BaseConnector._acquired set grows to the size of the connection pool and does not fall back.

Checklist

pip freeze results


aiosignal==1.3.1
async-timeout==4.0.2
attrs==22.2.0
charset-normalizer==2.1.1
frozenlist==1.3.3
idna==3.4
multidict==6.0.4
yarl==1.8.2
aiohttp==3.8.4

certifi==2023.5.7

async-timeout==4.0.2
redis==5.0.1

PyYAML==6.0.1

flit_core==3.8.0
packaging==23.0
marshmallow==3.19.0
webargs==8.2.0

psycopg
asyncpg
sqlalchemy[asyncio]

aiobotocore==2.8.0

aiofiles

Environment:

Code(connection):

from contextlib import AsyncExitStack

from aiobotocore import session
from aiobotocore.config import AioConfig

class S3:
    def __init__(
        self,
        access_key_id: str,
        secret_access_key: str,
        endpoint_url: str,
        bucket: str,
    ) -> None:
        self._access_key_id = access_key_id
        self._secret_access_key = secret_access_key
        self._endpoint_url = endpoint_url
        self._bucket = bucket
        self._context_stack = AsyncExitStack()
        self._session = session.get_session()
        self.config = AioConfig(max_pool_connections=0)
        # self.config.connector_args = {"keepalive_timeout": None, "force_close": True}  

    @property
    def bucket(self) -> str:
        return self._bucket

    @property
    def client(self):
        return self._client

    async def action_connect(self, *args, **kwargs) -> None:
        self._client = await self._context_stack.enter_async_context(
            self._session.create_client(
                's3',
                aws_access_key_id=self._access_key_id,
                aws_secret_access_key=self._secret_access_key,
                endpoint_url=self._endpoint_url,
                config=self.config,
            )
        )

    async def action_disconnect(self, *args, **kwargs) -> None:
        await self._context_stack.aclose()

Code(use case)


async def _get_objects_keys(self) -> list[dict[str, Any]]:
        objects = await self._s3.client.list_objects_v2(
            Bucket=self._s3.bucket, Prefix="smth/"
        )
        return objects.get("Contents", [])

async def _get_objects(self) -> list[Object]:
        result = []
        async with asyncio.timeout(self._settings.config["timeout"]):
            keys = await self._get_objects_keys()
            for key in keys:
                url = await self._s3.client.generate_presigned_url(
                    'get_object',
                    Params={"Key": key["Key"], "Bucket": self._s3.bucket},
                    ExpiresIn=self._settings.config["download_url_expires_sec"],
                )
                get_object = await self._s3.client.get_object(
                    Bucket=self._s3.bucket, Key=key["Key"]
                )
                obj = ...
                result.append(obj)
        return result

When I use only generate_presigned_url/list_objects_v2 - everything ok. Code working fast, no exceptions/timeouts 10 calls of _get_objects func without get_object call:

bash-4.2# ss -s
Total: 12 (kernel 1802)
TCP:   116 (estab 3, closed 110, orphaned 61, synrecv 0, timewait 11/0), ports 0

Transport Total     IP        IPv6
*         1802      -         -        
RAW       0         0         0        
UDP       1         1         0        
TCP       6         5         1        
INET      7         6         1        
FRAG      0         0         0    

when I start using the get_object the problems start 10 calls of _get_objects func with get_object call:

bash-4.2# ss -s
Total: 27 (kernel 1809)
TCP:   137 (estab 18, closed 116, orphaned 66, synrecv 0, timewait 12/0), ports 0

Transport Total     IP        IPv6
*         1809      -         -        
RAW       0         0         0        
UDP       1         1         0        
TCP       21        20        1        
INET      22        21        1        
FRAG      0         0         0    

I found out that if i place response.release() here https://github.com/aio-libs/aiobotocore/blob/master/aiobotocore/httpsession.py#L227

evertthing appears to be good. But aiobotocore tests start failng :)

Also, if i take a ClientSession from AIOHTTPSession class of aiobotocore and try to send a request to https;//... by this session, it doesnt work too. It will be just infinity request.