aws-powertools / powertools-lambda-python

A developer toolkit to implement Serverless best practices and increase developer velocity.
https://docs.powertools.aws.dev/lambda/python/latest/
MIT No Attribution
2.81k stars 390 forks source link

RFC: SQS partial batch failure middleware #92

Closed gmcrocetti closed 4 years ago

gmcrocetti commented 4 years ago

First things first: Congratulations for the amazing repo.

Key information

Summary

A lambda processing a batch of messages from SQS is a very common approach and it works smooth for most use cases. Now, for the sake of example, suppose we're processing a batch and one of the messages failed, lambda is going to redrive this batch to the queue again, including the successful ones ! Re-running successful messages is not acceptable for all use cases.

Motivation

A very common execution pattern is running a lambda connected in sqs, in most cases with a batch size not equal to one. In such cases, an error to one of the processed messages will cause the whole batch to return to the queue. For some use cases, it's impossible to rely on such behavior - non idempotent actions. A solution for this problem would improve the experience of using a lambda with SQS.

Proposal

I'm going to propose a very simple code that's not complete.

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

@lambda_handler_decorator
def sqs_partial_batch_failure(handler, event, context):
    sent_records = event['Records']
    sqs_client = boto3.client('sqs')

    response = handler(event, context)

    successful_messages = get_successful(response)
    sqs_client.delete_message_batch(successful_messages)  # deletes 3rd and 7th messages

# batchsize of 10, fails for 3rd and 7th message
@sqs_partial_batch_failure
def handler(event, context):
    for record in event['Records']:
        do_sth(record)

Drawbacks

Rationale and alternatives

Unresolved questions

Optional, stash area for topics that need further development e.g. TBD

nmoutschen commented 4 years ago

Hi Guilherme! Thanks a lot for creating an issue! ๐Ÿ˜„

We're currently working through what tiny utilities should be included as part of the powertools. There's a fine balance between providing a good set of standard tools and preventing bloat.

Could you flesh this issue as an RFC (see this ticket template) in term of what the ideal developer experience would look like? If you already have similar implementations on GitHub, that'd also help a lot with knowing what would be the impact for such a middleware. Up to you if you want to create a new ticket using the template or include the RFC in this one. ๐Ÿ˜„

gmcrocetti commented 4 years ago

Hi Guilherme! Thanks a lot for creating an issue! ๐Ÿ˜„

We're currently working through what tiny utilities should be included as part of the powertools. There's a fine balance between providing a good set of standard tools and preventing bloat.

Could you flesh this issue as an RFC (see this ticket template) in term of what the ideal developer experience would look like? If you already have similar implementations on GitHub, that'd also help a lot with knowing what would be the impact for such a middleware. Up to you if you want to create a new ticket using the template or include the RFC in this one. ๐Ÿ˜„

Thanks for you suggestion, Nicolas !

nmoutschen commented 4 years ago

Thanks a lot, it's much clearer this way! ๐Ÿ‘

Quick question on how the implementation would look like. If I understood correctly your proposal, the handler should return something that will tell the middleware that messages were failed/successful, then if there are any failed message, the middleware should throw an error (similar to how middy does it)?

E.g. would it look like this?

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

sqs_client = boto3.client("sqs")    

def get_queue_url_from_arn(arn: str) -> str:
    _, account_id, name = arn.rsplit(":", 2)
    return sqs.get_queue_url(
        QueueName=name,
        QueueOwnerAWSAccountId=account_id
    )["QueueUrl"]

@lambda_handler_decorator
def sqs_partial_batch_failure(handler, event, context):
    response = handler(event, context)

    # This could be optional.
    # We only need the list of successful messages, but this could be used for logging.
    failed_messages = response.get("failed", [])

    successful_messages = response.get("success", [])

    # Process successful messages, if any.
    if successful_messages:
        queue_url = get_queue_url_from_arn(event["Records"][0]["eventSourceARN"])

        # Delete the successful messages
        sqs.delete_message_batch(
            QueueUrl=queue_url,
            Entries=[
                {
                    "Id": e["messageId"],
                    "ReceiptHandle": e["receiptHandle"]
                } for e in successful_messages
            ]
        )

    # Throw an exception for failed messages
    if failed_messages:
        throw Exception(f"Failed to process {len(failed_messages)} messages.")

    # Alternative if we don't capture failed messages:
    # if len(successful_messages) < len(event["Records"]):
    #     throw Exception("Failed to process {} messages.".format(len(event["Records"]) - len(successful_messages)))
gmcrocetti commented 4 years ago

Exactly ! After removing successful messages, raise an error for failed ones. One technical detail of middy is that they enforce an API (allSettled), I'm not a huge fan of this approach but also, not quite sure if we'll be able to create some abstraction of "error tracking" each record, for both sync and async cases. Thoughts ?

Nr18 commented 4 years ago

We have built this our selves so I can share the how/why as input, sample code:

sqs = SQS(queue=os.getenv("QUEUE_URL"))

def handler(event: dict, context: LambdaContext) -> None:
    # Run the given callback method for each message
    sqs.process_messages(event["Records"], sqs_callback)

def sqs_callback(message: str, attributes: dict) -> bool:
    # Process your message here
    return True # False or an Exception would not delete the specific message

The SQS class lives in a lambda layer:

class SQS:
    def __init__(self, queue: str) -> None:
        self.log = logging.getLogger()        
        self.client = boto3.client("sqs")
        self.queue = queue

    def process_messages(self, messages: List[dict], callback: Callable) -> None:
        """
        Call the callback with the body for each SQS message in the given event, the callback needs to return True if
        the message has been handled as expected. False or an exception may be raised in case of unexpected output.

        When the callback function returns True the message will be deleted from the SQS Queue.

        When the callback function returns False the message will not be deleted from the SQS Queue and fallback on the
        retry policy and dead-letter queue configuration.
        """
        self.log.info(f"Found {len(messages)} records.")

        failed_records = []

        for message in messages:
            try:
                # Recover the trace context from the trace header (java sample code, see https://docs.aws.amazon.com/xray/latest/devguide/xray-services-sqs.html)
                # Segment segment = AWSXRay.getCurrentSegment();
                # segment.setTraceId(traceHeader.getRootTraceId());
                # segment.setParentId(traceHeader.getParentId());
                # segment.setSampled(traceHeader.getSampled().equals(TraceHeader.SampleDecision.SAMPLED));
                processed = callback(message["body"], message.get("attributes"))
            except Exception as e:
                processed = False
                self.log.warning(str(e))

            if processed:
                self.log.info(f"Record {message['messageId']} has been processed.")
                self.__delete_message(message)
            else:
                self.log.warning(
                    f"Unable to process the message with the id: {message['messageId']}"
                )
                failed_records.append(message["messageId"])

        if failed_records:
            raise ValueError(
                "The following records failed to be processed:\n"
                + "\n".join(failed_records)
            )

    def __delete_message(self, record: dict):
        self.log.info(f"Delete message {record['messageId']} from the SQS Queue.")
        self.client.delete_message(
            QueueUrl=self.queue, ReceiptHandle=record["receiptHandle"]
        )

So here are some considerations:

gmcrocetti commented 4 years ago

Hi @nmoutschen an @Nr18, very insightful opnions. I've been thinking over this problem and sth came to my mind: This problem "partial batch failure" is not a particular problem of SQS, we have some other services (kinesis and ddb) facing the same technical limitation. I know this is out of the scope of the current issue, but what if we extended a little bit by offering support to all these services ? At least to think about a interface for all of them. Here's a prototype:

from typing import Callable, Coroutine, List, Dict
from contextlib import contextmanager, 

import boto3

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator

def get_queue_url() -> str:
    pass

def service_factory(service: str) -> str:
    if service == 'sqs':
        return sqs_partial_batch

def get_service_from_record(record: Dict):
    return record.get('eventSource')

class BatchProcessor:

    def __init__(self, records: List):
        self.records = records
        self.failed_messages = []

    def process(self, record_handler: Callable):
        for record in self.records:
            try:
                record_handler(record)
            except Exception:
                self.failed_messages.append(record)

    async def async_process(self, record_handler: Coroutine):
        pass

@contextmanager
def ddb_partial_batch(records: List[Dict]):
    pass

@contextmanager
def kinesis_partial_batch(records: List[Dict]):
    pass

@contextmanager
def sqs_partial_batch(records: List[Dict]):
    queue_url = get_queue_url()
    sqs_client = boto3.client('sqs')
    processor: BatchProcessor = BatchProcessor(records)

    try:
        yield processor
    finally:
        sqs_client.delete_message_batch(
            QueueUrl=queue_url, Entries=processor.failed_messages
        )

def process_record(record: Dict):
    if record.get('fail'):
        raise Exception

    return record

def handler(event, context):
    with sqs_partial_batch(event['Records']) as ctx:
        ctx.process(process_record)

It's also possible to create a middleware to work for all services:

@lambda_handler_decorator
def partial_batch(handler, event, context, record_handler: Callable = None):
    service = get_service_from_record(event['Records'][0])
    partial_batch_processor = service_factory(service)

    records: List[Dict] = event['Records']

    with partial_batch_processor(records) as processor:
        processor.process(record_handler)

@partial_batch(record_handler=process_record)
def handler(event, context):
    pass

We can provide both of them :). Hope to hear from you guys ! ps. The implementation is incomplete.

Nr18 commented 4 years ago

@gmcrocetti that was something I was thinking about as well. In a Lambda function, you want to focus on the business logic so having only this in your lambda does that:

from ........ import partial_batch

@partial_batch(record_handler=process_record)
def handler(event, context) -> None:
    pass

def process_record(record: dict) -> None:
    # Business logic
    pass

Just thinking outload here if you look at the powertools it would look like this (a lot of decoration going on):

@tracer.capture_lambda_handler
@logger.inject_lambda_context
@metrics.log_metrics(capture_cold_start_metric=True)
@partial_batch(record_handler=process_record)
def handler(event: dict, context: LambdaContext) -> None:
    pass

@tracer.capture_method
def process_record(record: dict) -> None:
    # Business logic
    pass
heitorlessa commented 4 years ago

Havenโ€™t read this in depth nor put much thought into it yet, but it sounds like this would benefit from a BatchProcessor Provider so you could swap them, add custom logic if necessary, and have itโ€™s own config.

On the other hand, Kinesis and DDB async controls should remove poison pills/ingest failed records into a DLQ. Stream processors in general should take idempotency into account, though I see your point on wanting to avoid another 3-step cycle of - Put something into a DLQ, alerting on DLQ depth, re-drive messages from DLQ to Stream/Queue, by deleting successful messages only instead.

As regards to the decorator piece, we created the Middleware factory to also account for that โ€” Hereโ€™s an example I did last week to combine Logger, Tracer, and some custom logic too

@metrics.log_metrics(capture_cold_start_metric=True) @process_booking_handler(logger=logger) def lambda_handler(event, context): ...

https://github.com/aws-samples/aws-serverless-airline-booking/blob/develop/src/backend/booking/src/cancel-booking/cancel.py

On Fri, 31 Jul 2020 at 11:10, Joris Conijn notifications@github.com wrote:

@gmcrocetti https://github.com/gmcrocetti that was something I was thinking about as well. In a Lambda function, you want to focus on the business logic so having only this in your lambda does that:

from ........ import partial_batch @partial_batch(record_handler=process_record)def handler(event, context) -> None: pass def process_record(record: dict) -> None:

Business logic

pass

Just thinking outload here if you look at the powertools it would look like this (a lot of decoration going on):

@tracer.capture_lambda_handler@logger.inject_lambda_context@metrics.log_metrics(capture_cold_start_metric=True)@partial_batch(record_handler=process_record)def handler(event: dict, context: LambdaContext) -> None: pass @tracer.capture_methoddef process_record(record: dict) -> None:

Business logic

pass

โ€” You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/awslabs/aws-lambda-powertools-python/issues/92#issuecomment-667020669, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAZPQBFSP2OTNT5YMRULF5LR6KC6VANCNFSM4PCN2HYA .

gmcrocetti commented 4 years ago

Any updates ? I would like to contribute, if possible :)

heitorlessa commented 4 years ago

hey @gmcrocetti - I'm happy with this, though I can't work on the implementation atm. If you'd like to give it a go, feel free to create a PR after setting up your dev env.

As this will be a utility, you might be able to copy the structure Nicolas started in #96 so we can use .utilities namespace for non-core utilities.

Let me know if you need a hand on any of this, otherwise I can look at it after finishing the #97 and #95 :)

Thanks a lot for the help

heitorlessa commented 4 years ago

Nearly there... \ o / Minor adjustments on docs, and we should be able to publish it in the next release (1.5.0)

heitorlessa commented 4 years ago

@gmcrocetti lemme know what's easiest way to reach out to you - Feel free to DM me on Twitter. I'd like to help complete that PR so we can release 1.5.0 this week ;)

Update - We've just got our own Slack channel (#lambda-powertools) on AWS Developers workspace - Invite if you don't have an account yet

heitorlessa commented 4 years ago

This has now available in 1.5.0 release