whale2 / async-kinesis-client

Python Kinesis Client library utilising asyncio
MIT License
13 stars 4 forks source link

Make async-kinesis-client DynamoDB checkpoint table compatible with aioboto3 context manager requirement #11

Open iztaylor opened 4 years ago

iztaylor commented 4 years ago

As mentioned in https://github.com/whale2/async-kinesis-client/issues/10#issue-625379437, aiobotocore==1.0.4 requires context managers when instantiating clients/resources. Consequently, we run into issues when passing a DynamoDB table name to the checkpoint_table kwarg of the AsyncKinesisConsumer class and attempt to instantiate the table in src/async_kinesis_client/dynamodb.py file using table = aioboto3.resource('dynamodb').Table(self.table_name) syntax.

Instead of passing in a table name to the checkpoint_table kwarg of the AsyncKinesisConsumer class, this change allows one instead to pass a table object to that kwarg like so:

    async with session.create_client(
        "kinesis",
        region_name="us-west-2",
        endpoint_url="https://kinesis.us-west-2.amazonaws.com",
    ) as client:
        async with aioboto3.resource(
            "dynamodb", region_name="us-west-2"
        ) as dynamo_resource:
            table = await dynamo_resource.Table("kinesis-state-test")
            consumer = AsyncKinesisConsumer(
                stream_name="kinesis-stream-test",
                checkpoint_table=table,
                custom_kinesis_client=client,
            )

            # consumer will yield existing shards and will continue yielding
            # new shards if re-sharding happens
            async for shard_reader in consumer.get_shard_readers():
                asyncio.ensure_future(read_records(shard_reader))