aws-powertools / powertools-lambda-python

A developer toolkit to implement Serverless best practices and increase developer velocity.
https://docs.powertools.aws.dev/lambda/python/latest/
MIT No Attribution
2.81k stars 389 forks source link

Large Message handling for AWS Event/Message services - SQS, SNS, EventBridge, [insert others] #1615

Open walmsles opened 1 year ago

walmsles commented 1 year ago

Is this related to an existing feature request or issue?

No response

Which AWS Lambda Powertools utility does this relate to?

Batch processing

Summary

A common pain and friction point in developing Event-Driven Architectures using AWS Serverless managed services is the existing service limit on message size which generally equates to 256KB for messages passing through any of the messaging services - SQS, SNS, EventBridge, [insert others here].

In an enterprise integration landscape, 256KB is a challenging size. It requires the building of store-and-forward systems where the inbound message is stored in an S3 bucket (or another storage mechanism) and then pushing meta-data through the messaging service to enable processing by consumers, which all need to retrieve the actual message using the metadata before processing.

Use case

System Integration Use-case where data is submitted to an integration service in AWS which uses EventBridge to route event data to different destinations. API gateway to Lambda proxy integration allows up to 6MB of payload to be processed which is way larger then 256KB limit of event bridge.

In this scenario the Lambda behind the API is required to store the large payload first in S3 and then push meta-data for routing through EventBridge. Consumers then need to read the original large message and process it.

Proposal

Similar to the idempotency Utility, an abstract Persistence Class should be created to allow for the storing and retrieval of a message into an AWS storage Service (defaulting to S3 seems sensible).

Build out a message client handler like the sqs-extended-client-lib (which is based on the AWS Java implementation for large message sending to SQS and appears abandoned on GitHub) for storing, creating meta-data for forwarding through the messaging service.

Like idempotency would need to consider JMESPath for extracting meta-data or a mechanism for building the required message structure for submitting to AWS message service (SQS, SNS, Eventbridge, etc.).

Feels like a nice utility to cover more than just SQS but also others - ideally should start with the most common use case, and SQS seems logical in this regard since there are well-known implementations that already exist and match the existing user experience from the Java utility as a starting point seems reasonable.

Would also integrate the capability of detecting the large message events within the existing Powertools batch processing utilities so that the retrieval of the large message can be done as part of the batch processing utilities when they are used.

Need to provide a stand-alone mechanism for retrieving the large messages for customers who wish to partially adopt Lambda powertools so there are pathways for everyone to gain advantage from this feature.

Producer:


from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.message_handler import SQSMessageClient, S3MessagePersistenceLayer, MessageClientConfig
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.utilities.typing import LambdaContext

tracer = Tracer()
logger = Logger()
app = APIGatewayRestResolver()

config =  MessageClientConfig(
    message_key_jmespath="messageId", 
    use_local_cache=True,
)
persistence_layer = S3PersistenceLayer(bucket=os.environ.get("MessageBucket"))
sqs_client = SQSMessageClient(persistence_store=persistence_layer, config=config)

@app.post("/event")
@tracer.capture_method
def post_event():
    payload = app.current_event.json_body
    return sqs_client.send_message(queue=os.environ.get("MyMessageQueue"), message=payload)

# You can continue to use other utilities just as before
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

Consumer:

import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()

@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Out of scope

Initially a single messaging service should be targetted but consideration of other Messaging services (SNS, EventBridge, Kinesis, etc) should be considered through design to enable this feature on all batch processing utilities in the future.

Potential challenges

A customisable method of taking the large message and creating the meta-data for the messaging service with data detailing how to retrieve the large message is needed. JMESPath or JSON path can be considered for simple use cases but should also provide a custom function implementation since not every use case will be so straightforward.

Need to also consider how the consumer would know where to access the large message data given it will be in a different Lambda function and potentially without knowledge of the storage mechanism - if this could be embedded in meta-data for Powertools to determine from the consumer side it would lessen boilerplate considerations.

Dependencies and Integrations

Batch Utilities would need to understand how to retrieve large messages when this utility is in use. Having the retrieval as an automated mechanism removes a lot of boilerplate code from existing solutions.

Alternative solutions

[sqs-extended-client-lib](https://github.com/timothymugayi/boto3-sqs-extended-client-lib) is a consideration for SQS only and appears to be abandoned (no changes in over 14 months).

Acknowledgment

walmsles commented 1 year ago

Hi @heitorlessa am happy to pick this one up, I am busy for a week but will have time after that to work on it.

ims-swilkinson commented 1 year ago

Would Step Functions invocations of Lambda be a good candidate for this feature? I know the map state in distributed mode can do something similar if all of your items are on the same prefix, via ListObjectsV2. But this feature could be useful for invoking Lambda from other types of state such as task, with large payloads.

heitorlessa commented 1 year ago

@ims-swilkinson you mean a Lambda function receiving a Payload referencing a S3 object URL? As in, Step Function -> Lambda?

If so, that would be possible with a generic function a consumer could use to fetch data from S3, optionally deserialize, and optionally transform it (maybe something else?).

Reason I say generic is that the RFC @walmsles wrote focuses the Consumer angle within the Batch Processing feature. For Step Functions, they don't pass any particular metadata by default to make this experience nicer.

The more I think the more I find that if we create a new contract and suggest customers to invoke Lambda functions using this contract as a payload, we could easily create a new Event Handler to decouple function handler and tasks/steps, and bring these nice features like this.

ims-swilkinson commented 1 year ago

Yes it could work with a simple contract specifically for large payloads. But I understand how the lack of a contract for Step Functions invocation of Lambda is a problem here, I forgot there isn't one.

walmsles commented 1 year ago

The more I think the more I find that if we create a new contract and suggest customers to invoke Lambda functions using this contract as a payload, we could easily create a new Event Handler to decouple function handler and tasks/steps, and bring these nice features like this.

Oooooh - this makes so much sense ...... if powertools could introduce an adapter pattern, it opens up so many possibilities! I have been thinking so much about this lately, and fits with where I want to push serverless development in large enterprises today. It opens up not just idempotent handling but large message handling across EVERY service.

alexandreczg commented 1 year ago

Will leave here my 2 cents on this matter, since I too suffer of this problem. Payload too large.

I believe having an established contract in the payload itself its a must, so we can use a specific handler based on some flag in that contract ( assuming).

One thing that comes to mind, which is specific to my use case but might as well be for others. For my workloads, I actually don't know the payload size ahead of time. The same pipeline operates on small and large payloads. For small payloads, I send the content in the message itself (call it the normal and usual pathway). For large payload I send a reference to an S3 location where the payload can be found. My handler then checks a flag in the payload itself to see where to grab the content from. It's an adapter pattern of sorts.

What this allows me is to have a single Lambda handling both type of payloads, indiscriminately. I don't have to force the Lambda to use the S3 Reference for all payloads even if they are small. Saves on cost $.

walmsles commented 1 year ago

Not knowing the exact size of payloads is common, with larger payloads being the exception rather than the rule in most situations. The intention with this utility is to focus on the processor side first with compatibility to existing sqs-extended-client libraries for java and python. Fetching of the large file would be handled by Powertools batch processor where required according to the additional meta-data added to the SQS Message content by the client-side (refer links for java and python)

So the intent of the feature would exactly match your requirement.

joshwand commented 9 months ago

This functionality is already present in the Java version of lambda powertools:

https://docs.powertools.aws.dev/lambda/java/utilities/large_messages/

That said, it would be awesome if this were not limited to SQS/SNS-- my usecase is step functions, where the returned payload from a lambda task may exceed the 256KB threshold and would be truncated/swallowed by stepfunctions

walmsles commented 6 months ago

AWS Labs have released Python extended clients for SNS (https://github.com/awslabs/amazon-sns-python-extended-client-lib) and SQS (https://github.com/awslabs/amazon-sqs-python-extended-client-lib) which is nice for sending of large messages to SNS and SQS.

At a minimum, the Powertools Batch utility should support these 2 use cases as a starting point using a "LargeMessage" decorator similar to the Java implementation mentioned above (so it is an opt-in)

This issue should focus on these 2 use cases only, with all other large message handling being split into another issue for clarity and simplicity. Other service use cases have no client implementations which is a separate discussion on where the client components sit and should powertools maintain those?

I am happy to take this on for SNS and SQS now the clients are officially supported.

heitorlessa commented 6 months ago

Let's revisit this in April - if we're going to take a dependency on the extended client lib, we need to be sure this will be available for TypeScript/Node/.NET too. Otherwise we need a good abstraction on that, ideally beyond SNS/SQS too hence need time to do this properly.

Higher demand now is to allow customers to know when a transaction was idempotent . Event Handler for WebSockets also has a higher demand -- we can revisit the board in April to rank them

walmsles commented 6 months ago

Thanks @heitorlessa, makes sense. For when this comes up again here is another example of AWS integration looking to deal with large events: Support Sidelining large events (eventbridge-kafka connector)