terricain / aioboto3

Wrapper to use boto3 resources with the aiobotocore async backend
Apache License 2.0
732 stars 75 forks source link

How to reuse session and client in a more complex app ? #233

Closed abivolmv closed 1 year ago

abivolmv commented 3 years ago

Description

I would like to reuse session or client in multiple concurrent calls to S3 and SQS. This is in order to have faster execution of the code. If I build the session and client on each call then it takes too much time. I tried creating the client globally in the file where all aws functions are. I also tried creating it in the main file and passing it as a parameter to the function that would use it.

What I Did

Here is a minimalistic code of my app. The complete code will span in multiple files and functions. All aws functions are in separate file from the logic that creates the loops and lists of tasks needed to be run (so I cannot create all my logic under one async with aioboto3.Session().client('s3') as s3: .

import asyncio
from io import BytesIO
import aioboto3

s3_client = aioboto3.Session().client('s3')

def lambda_handler(a,b):
    asyncio.run(main())

async def main():
    filelike1 = BytesIO()
    filelike2 = BytesIO()
    await asyncio.wait([s3_get(filelike1),s3_get(filelike2)])
    # later or maybe in some other function
    await asyncio.wait([s3_get(filelike3),s3_get(filelike3)])

async def s3_get(filelike):
    async with s3_client as s3:  # to make it work here i create new session and client async with aioboto3.Session().client('s3') as s3:
        return await s3.download_fileobj('s3-files-003', '.versions.txt', filelike)
future: <Task finished coro=<s3_get() done, defined at /var/task/lambda_function.py:19> exception=RuntimeError('cannot reuse already awaited coroutine')>
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 20, in s3_get
    async with s3_client as s3:
  File "/opt/python/lib/python3.7/site-packages/aiobotocore/session.py", line 37, in __aenter__
    self._client = await self._coro
RuntimeError: cannot reuse already awaited coroutine
terricain commented 3 years ago

Ok so you have 2 options:

For your example, something like this might work:

import asyncio
from io import BytesIO
import aioboto3

aioboto3_session = aioboto3.Session()

def lambda_handler(a,b):
    asyncio.run(main())

async def main():
    filelike1 = BytesIO()
    filelike2 = BytesIO()
    await asyncio.wait([s3_get(filelike1),s3_get(filelike2)])
    # later or maybe in some other function
    await asyncio.wait([s3_get(filelike3),s3_get(filelike3)])

async def s3_get(filelike):
    async with aioboto3_session.client('s3') as s3:  # to make it work here i create new session and client async with aioboto3.Session().client('s3') as s3:
        return await s3.download_fileobj('s3-files-003', '.versions.txt', filelike)

As the problem was aioboto3.Session().client('s3') returns an unawaited coroutine. One of the s3_get's would await the coroutine with the async with and then the next s3_get would then try and await an already awaited coroutine

abivolmv commented 3 years ago

@terrycain thank you for the answer. In some projects I use Chalice and it has an app object. I will try to store it in it's context.

abivolmv commented 3 years ago

Pass in a valid s3 client object into the functions.

I tried storing the client in the app's context but the same issue happens :

app = Chalice(app_name=APP_NAME)
app.aio_session = aioboto3.Session(region_name=REGION)
app.aio_s3_client = app.aio_session.client('s3', region_name=REGION)
async with app.aio_s3_client as s3:
    return await s3.download_fileobj(bucket, key, filelike)
RuntimeError: cannot reuse already awaited coroutine

If I use the session then it works but I have to create a client for each call.

async with app.aio_session.client('s3', region_name=REGION) as s3:
    return await s3.download_fileobj(bucket, key, filelike)
terricain commented 3 years ago

Yeah because you've functionally not done anything different. try

app.aio_s3_client = None

async def get_s3_client():
    if app.aio_s3_client is None:
        app.aio_s3_client = await app.aio_session.client('s3', region_name=REGION)
    return app.aio_s3_client

...
async def some_function():
    s3 = await get_s3_client()
    return await s3.download_fileobj(bucket, key, filelike)

I've not tested that but it should work.

So normally web frameworks have some hooks like on_startup and on_shutdown, where you'd setup stuff like this removing the need for a get_s3_client function but Chalice doesnt have anything like this.

if you can get me a super basic chalice example, ideally one I can run locally, I could look into seeing how difficult it would be to make a simple aioboto3 Chalice integration

abivolmv commented 3 years ago

Thank you for the effort. I tried your method. Unfortunately I get the error : "object ClientCreatorContext can't be used in 'await' expression" (in get_s3_client).

Here is a basic example you can use to replicate and try to make an integration. Clone it, pip install and then run chalice local. You would still need a bucket and SQS for tests.

terricain commented 3 years ago

Ok I've looked into it, reusing the session can be done, but reusing the output of session.client('s3') etc... doesnt make sense due to how Chalice and by extension lambda works.

I've released a beta version aioboto3==9.2.0b0 which has an example integration which might provide useful. Example use here - https://github.com/terrycain/aioboto3/blob/master/tests/chalice_app/__init__.py Bit of info here - https://aioboto3.readthedocs.io/en/latest/chalice.html

Currently only works for http endpoints. I've not tested it deploying to an actual lambda, but it seems to work fine locally. Give it a go, if it works for HTTP and you find it useful, I can look at expanding it to work with the rest of the event types.

abivolmv commented 1 year ago

Sorry for the delay, I finally got back to this and tested and it works. I even tried it with gather/wait :

import asyncio
from aioboto3.experimental.async_chalice import AsyncChalice

app = AsyncChalice(app_name='basic-example')

@app.route('/')
def s3():
    return asyncio.run(main())

async def main():
    results = await asyncio.gather(
        get_list_buckets(),
        get_list_buckets(),
        get_list_buckets(),
        get_list_buckets()
    )
    return [*results]

@app.route('/list_buckets')
async def get_list_buckets():
    async with app.aioboto3.client("s3") as s3:
        resp = await s3.list_buckets()

    return [bucket['Name'] for bucket in resp['Buckets']]

I looked at what you did in the aioboto3.experimental.async_chalice. Do you think it is in general OK to add such middleware in aioboto3 specific for Chalice or should we contact Chalice devs to make some changes. I am not proposing - really asking.

Thanks for your effort, it's really appreciated.

terricain commented 1 year ago

So normally I'd say contact the chalice devs, but you might not get any traction. I have no issues leaving it in aioboto3. If they want it they can have it.

When the next major version is out i'll move it from .experimental to something else that ive not thought of.

vt-rc commented 8 months ago

I am really confused reading this thread.... aren't you still recreating a s3 client each time get_list_buckets is called?

@app.route('/list_buckets')
async def get_list_buckets():
    async with app.aioboto3.client("s3") as s3:
        resp = await s3.list_buckets()

    return [bucket['Name'] for bucket in resp['Buckets']]

nothing has been answered about reusing client? Isn't the whole point of this thread about reusing client? This is what I am stuck with right now

I am not sure why the library requires calling with design pattern style

async with ....... as client:

its difficult to build OOP class with this style and requires me to recreate a client for any class method that has a call to aws.