aws-powertools / powertools-lambda-python

A developer toolkit to implement Serverless best practices and increase developer velocity.
https://docs.powertools.aws.dev/lambda/python/latest/
MIT No Attribution
2.8k stars 387 forks source link

Batch helper methods - AWS SDK helpers #1187

Open jplock opened 3 years ago

jplock commented 3 years ago

Is your feature request related to a problem? Please describe.

Batch operation helper methods for SQS and DynamoDB

Describe the solution you'd like

Helper methods that will automatically chunk a list into specific sizes compatible with sqs SendMessageBatch and dynamodb BatchWriteItem

Describe alternatives you've considered

I end up rewriting this same functionality in multiple projects.

Additional context

heitorlessa commented 3 years ago

Hey @jplock, thanks for the feature request! Could you share an example of what you have at the moment?

That will help clarifying whether this is best placed in the boto3 SDK, or if a helper method as part of SQS/DynamoDB data classes

Thanks!

jplock commented 3 years ago
from typing import List, Any

import boto3

# https://stackoverflow.com/a/1751478
def chunks(l, n):
    n = max(1, n)
    return (l[i:i+n] for i in range(0, len(l), n))

DYNAMODB_BATCH_WRITE_ITEM_LIMIT = 25
SQS_SEND_MESSAGE_BATCH_LIMIT = 10

def dynamodb_batch_write_item(table_name: str, items: List[Any]) -> None:
    dynamodb = boto3.client("dynamodb")

    for chunk in chunks(items, DYNAMODB_BATCH_WRITE_ITEM_LIMIT):
        if not chunk:
            continue

        params = {}
        params[table_name] = chunk

        dynamodb.batch_write_item(**params)

def sqs_send_message_batch(queue_url: str, messages: List[Any]) -> None:
    sqs = boto3.client("sqs")

    for chunk in chunks(messages, SQS_SEND_MESSAGE_BATCH_LIMIT):
        if not chunk:
            continue

        params = {
            "QueueUrl": queue_url,
            "Entries": chunk
        }

        sqs.send_message_batch(**params)

The DynamoDB Batch Write Item call gets more complicated since it can operate across multiple tables, so I'm not sure on the best interface for that, but hopefully this makes sense. Retry logic would also need to be added into the DynamoDB use-case to handle retrying any items returned in UnprocessedItems.

heitorlessa commented 3 years ago

I've had a think about this. I'd love if anyone could create a RFC of a new SDK Helpers utility - There's surely many of this cross cutting pieces that don't make it to the SDK which some might make in time while others don't (like this).

An additional idea for a SDK Helper utility is automatically adjusting timeouts (TCP/HTTP) based on Lambda function duration (context).

There could be more to make this feasible so a RFC would give everyone an opportunity to chime in to make this more widely useful - Tagged as Pending/RFC before we consider it.

jplock commented 2 years ago

Here's a more flushed out example of a DynamoDB batch_write_item helper:

dynamodb = boto3.client("dynamodb")
max_retries = 15
min_sleep_time = 1e-2

@tracer.capture_method(capture_response=False)
def batch_write(items: Dict[str, Any], retry_count: int = 0) -> None:
    try:
        response = dynamodb.batch_write_item(RequestItems=items)
        metrics.add_metric(name="BatchWriteSuccess", unit=MetricUnit.Count, value=1)
    except botocore.exceptions.ClientError:
        logger.exception("Failed to write batch items to DynamoDB")
        metrics.add_metric(name="BatchWriteFailed", unit=MetricUnit.Count, value=1)
        raise

    unprocessed_items = response.get("UnprocessedItems", [])
    if unprocessed_items:
        if retry_count > max_retries:
            retry_count = random.randint(1, max_retries)
        sleep_time = min_sleep_time * random.randint(1, 2**retry_count)
        logger.warn(f"{len(unprocessed_items)} unprocessed items, sleeping for {sleep_time} seconds (retry count = {retry_count})")
        time.sleep(sleep_time)
        batch_write(unprocessed_items, retry_count + 1)