terricain / aioboto3

Wrapper to use boto3 resources with the aiobotocore async backend
Apache License 2.0
732 stars 75 forks source link

coroutine `AioSession._create_client` was never awaited #203

Closed attie closed 4 years ago

attie commented 4 years ago

Description

When upgrading from 7.1.0 to 8.0.2, I started seeing the following error, and the application quits.

Traceback (most recent call last):
  File "x.py", line 21, in <module>
    loop.run_until_complete(crash())
  File "/usr/lib/python3.6/asyncio/base_events.py", line 488, in run_until_complete
    return future.result()
  File "x.py", line 16, in crash
    bucket = await s3.meta.client.head_bucket(Bucket='test_bucket')
AttributeError: 'ResourceCreaterContext' object has no attribute 'meta'
sys:1: RuntimeWarning: coroutine 'AioSession._create_client' was never awaited

Rolling back to v7.1.0 resolves this issue (pip install aioboto3==7.1.0).

I've not had a chance to dig into this properly yet.

What I Did

import asyncio
import botocore
import aioboto3

async def crash():
    args = {
        'config': botocore.config.Config(
            signature_version='s3v4',
        ),
        'aws_access_key_id': 'minioadmin',
        'aws_secret_access_key': 'minioadmin',
        'endpoint_url': 'http://patch.attie.co.uk:9000/',
        'region_name': 'local',
    }
    s3 = aioboto3.resource('s3', **args)
    bucket = await s3.meta.client.head_bucket(Bucket='test_bucket')

    assert(False)

loop = asyncio.get_event_loop()
loop.run_until_complete(crash())
loop.close()
terricain commented 4 years ago

Breaking changes in v8. Read readme for more info

attie commented 4 years ago

Ah yes, sorry / quite right... (I had a quick look at some changes, but completely missed the text above...)

Could you offer advice on how I might rework / handle the following model for use with v8.0.0+?

import asyncio
import botocore
import aioboto3

class Bucket():
    def __init__(self, bucket_name):
        self.args = {
            'config': botocore.config.Config(
                signature_version='s3v4',
            ),
            'aws_access_key_id': 'minioadmin',
            'aws_secret_access_key': 'minioadmin',
            'endpoint_url': 'http://patch.attie.co.uk:9000/',
            'region_name': 'local',
        }
        self.bucket_name = bucket_name

    async def startup(self):
        self.s3 = aioboto3.resource('s3', **self.args)
        self.bucket = await self.get_bucket()

    async def cleanup(self):
        await self.s3.meta.client.close()

    async def get_bucket(self):
        # check that the bucket exists... add code to create it if not
        await self.s3.meta.client.head_bucket(Bucket=self.bucket_name)
        return self.s3.Bucket(self.bucket_name)

    async def list(self):
        async for item in self.bucket.objects.page_size(count=1000):
            yield item.key

async def demo():
    b = Bucket('test_bucket')
    await b.startup()

    async for obj in b.list():
        print(obj)

    await b.cleanup()

loop = asyncio.get_event_loop()
loop.run_until_complete(demo())
loop.close()

Would this be sensible?

import asyncio
import botocore
import aioboto3

class Bucket():
    def __init__(self, bucket_name):
        self.args = {
            'config': botocore.config.Config(
                signature_version='s3v4',
            ),
            'aws_access_key_id': 'minioadmin',
            'aws_secret_access_key': 'minioadmin',
            'endpoint_url': 'http://patch.attie.co.uk:9000/',
            'region_name': 'local',
        }
        self.bucket_name = bucket_name

    def get_s3(self):
        return aioboto3.resource('s3', **self.args)

    def get_bucket(self, s3):
        return s3.Bucket(self.bucket_name)

    async def startup(self):
        async with self.get_s3() as s3:
            # check that the bucket exists... add code to create it if not
            await s3.meta.client.head_bucket(Bucket=self.bucket_name)

    async def list(self):
        async with self.get_s3() as s3:
            b = await self.get_bucket(s3)

            async for item in b.objects.page_size(count=1000):
                yield item.key

async def demo():
    b = Bucket('test_bucket')
    await b.startup()

    async for obj in b.list():
        print(obj)

loop = asyncio.get_event_loop()
loop.run_until_complete(demo())
loop.close()
terricain commented 4 years ago

I'd do something like this - https://aioboto3.readthedocs.io/en/latest/usage.html#aiohttp-server-example (sorry didnt link to that in readme)

import asyncio
import contextlib

import botocore
import aioboto3

class Bucket():
    def __init__(self, bucket_name: str):
        self.args = {
            'config': botocore.config.Config(
                signature_version='s3v4',
            ),
            'aws_access_key_id': 'minioadmin',
            'aws_secret_access_key': 'minioadmin',
            'endpoint_url': 'http://patch.attie.co.uk:9000/',
            'region_name': 'local',
        }
        self.context_stack = contextlib.AsyncExitStack()
        self.bucket_name = bucket_name
        self._resource = None
        self._bucket = None

    async def startup(self):
        self._resource = await self.context_stack.enter_async_context(get_s3())

    async def close(self):
        if self._resource:
            await self.context_stack.aclose()
            self._resource = None

    def get_s3(self):
        return aioboto3.resource('s3', **self.args)

    async def get_bucket(self):
        if not self._bucket:
            self._bucket = await self.resource.Bucket(self.bucket_name)
        return self._bucket

    async def list(self):
            b = await self.get_bucket()

            async for item in b.objects.page_size(count=1000):
                yield item.key

async def demo():
    b = Bucket('test_bucket')
    await b.startup()

    async for obj in b.list():
        print(obj)

    await b.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(demo())
loop.close()

This stores a resource() within the class, deals with the context management properly, also Service Resources aka s3.Bucket, dynamodb.Table etc... now must be created with await. Needs python 3.7 for async exit stack though

attie commented 4 years ago

Brilliant tip - thanks very much for pointing me at contextlib.AsyncExitStack :+1:

For any others, @terrycain's example is missing an await on the context_stack.aclose()

terricain commented 4 years ago

Fixed example :D