aws-powertools / powertools-lambda-python

A developer toolkit to implement Serverless best practices and increase developer velocity.
https://docs.powertools.aws.dev/lambda/python/latest/
MIT No Attribution
2.85k stars 391 forks source link

Feature request: Parser support from multiple event sources using the same Lambda #2237

Open leandrodamascena opened 1 year ago

leandrodamascena commented 1 year ago

Use case

Context: AWS Lambda supports multiple different event sources mapping for the same Lambda - https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html

I'm opening this issue to discuss if the Lambda Powertools can support different parsers in a Lambda that has different EventSources. This thread started in our Discord server and it's interesting to hear customer feedback on this.

Original discussion: https://discord.com/channels/1006478942305263677/1006527338621710376/1103290540226785342

Solution/User Experience

We need to investigate this.

Alternative solutions

No response

Acknowledgment

rubenfonseca commented 1 year ago

Does this mean that the same Lambda would be connected to different event sources?

leandrodamascena commented 1 year ago

Does this mean that the same Lambda would be connected to different event sources?

Exactly man! In my point of view, a single lambda should only support a single event source because you can keep the single responsibility for that lambda and focus your business logic on it. But I see more and more users using fat lambdas, and Lambda supports multiple event sources, so I think we must hear from customers what the use case is and if the Powertools can help them.

What do you think about this Ruben?

image

rubenfonseca commented 1 year ago

This would be horrible in Go :)

The thing I'm worried about is that you would have to have a clear identifier mark for each kind of even source, so you should be able to distinguish SQS/SNS/etc from a generic dictionary... that isn't fun isn't it? So it would work like a "reverse-parser": here's a dict, tell me what kind of payload it is. Really not sure how to feel about this...

leandrodamascena commented 1 year ago

that isn't fun isn't it? So it would work like a "reverse-parser": here's a dict, tell me what kind of payload it is. Really not sure how to feel about this...

I thought the same, but it seems difficult and non-performatic to do this.

JeffDGAWS commented 1 year ago

I've done this before in Python...it works, but the code needs to understand what type of event it's handling and parse it appropriately. I don't want to imagine the pain-in-the-backside this would be in a staticly typed languge like Go or .NET.

am29d commented 1 year ago

Does it mean the message within SQS and SNS will have the same structure and we have to unwrap the body nicely with one signature without additional conditional expressions?

I think with 3.10 pydantic can handle these type of situations:

from typing import List, Union
import pytest
from pydantic import BaseModel

class SQSRecord(BaseModel):
    eventSource: str #"aws:sqs"
    awsRegion: str
    messageId: str
    body: str

class SNSPayload(BaseModel):
    Timestamp: str
    MessageId: str
    Message: str

class SNSRecord(BaseModel):
    EventSource: str #"aws:sns"
    EventVersion: str
    Sns: SNSPayload

class Events(BaseModel):
    Records: Union[List[SQSRecord],  List[SNSRecord]]

def lambda_handler(event, context):
    match Events(**event).Records[0]:
        case SNSRecord(EventSource="aws:sns"):
            return handle_sns(event, context)
        case SQSRecord(eventSource="aws:sqs"):
            return handle_sqs(event, context)
        case _:
            print("unknown event, you probably don't know what you are doing.")

def handle_sns(event, contex):
    print('handling sns event')
    return Events(**event).Records[0].Sns.Message

def handle_sqs(event, contex):
    print('handling sqs event')
    return Events(**event).Records[0].body

@pytest.fixture()
def sns_event():
    """ Generates SNS Event"""

    return {
        "Records": [
            {
                "EventVersion": "1.0",
                "EventSource": "aws:sns",
                "Sns": {
                    "Timestamp": "2019-01-02T12:45:07.000Z",
                    "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
                    "Message": "Hello from SNS!",
                }
            }
        ]
    }

@pytest.fixture()
def sqs_event():
    """ Generates SQS Event"""
    return {
        "Records": [
            {
                "eventVersion": "2.0",
                "eventSource": "aws:sqs",
                "awsRegion": "us-east-1",
                "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
                "body": "Hello from SQS!"
            }
        ]
    }

def test_lambda_handler(sns_event, sqs_event):
    ret = lambda_handler(sns_event, "")
    assert ret == "Hello from SNS!"

    ret = lambda_handler(sqs_event, "")
    assert ret == "Hello from SQS!"

Since we already have built-in models we can add this use case to our docs.

ran-isenberg commented 1 year ago

Does it mean the message within SQS and SNS will have the same structure and we have to unwrap the body nicely with one signature without additional conditional expressions?

I think with 3.10 pydantic can handle these type of situations:

from typing import List, Union
import pytest
from pydantic import BaseModel

class SQSRecord(BaseModel):
    eventSource: str #"aws:sqs"
    awsRegion: str
    messageId: str
    body: str

class SNSPayload(BaseModel):
    Timestamp: str
    MessageId: str
    Message: str

class SNSRecord(BaseModel):
    EventSource: str #"aws:sns"
    EventVersion: str
    Sns: SNSPayload

class Events(BaseModel):
    Records: Union[List[SQSRecord],  List[SNSRecord]]

def lambda_handler(event, context):
    match Events(**event).Records[0]:
        case SNSRecord(EventSource="aws:sns"):
            return handle_sns(event, context)
        case SQSRecord(eventSource="aws:sqs"):
            return handle_sqs(event, context)
        case _:
            print("unknown event, you probably don't know what you are doing.")

def handle_sns(event, contex):
    print('handling sns event')
    return Events(**event).Records[0].Sns.Message

def handle_sqs(event, contex):
    print('handling sqs event')
    return Events(**event).Records[0].body

@pytest.fixture()
def sns_event():
    """ Generates SNS Event"""

    return {
        "Records": [
            {
                "EventVersion": "1.0",
                "EventSource": "aws:sns",
                "Sns": {
                    "Timestamp": "2019-01-02T12:45:07.000Z",
                    "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
                    "Message": "Hello from SNS!",
                }
            }
        ]
    }

@pytest.fixture()
def sqs_event():
    """ Generates SQS Event"""
    return {
        "Records": [
            {
                "eventVersion": "2.0",
                "eventSource": "aws:sqs",
                "awsRegion": "us-east-1",
                "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
                "body": "Hello from SQS!"
            }
        ]
    }

def test_lambda_handler(sns_event, sqs_event):
    ret = lambda_handler(sns_event, "")
    assert ret == "Hello from SNS!"

    ret = lambda_handler(sqs_event, "")
    assert ret == "Hello from SQS!"

Since we already have built-in models we can add this use case to our docs.

I've worked with union before with pydantic and it has some issues when two schemas share common fields (results in invalid parsing/exceptions but it might be fixed).

I think it's really important to understand why someone would connect one lambda to multiple sources and decide whether it's something that AWS/community should recommend doing.

Usually, different outputs come from different pipelines, require different permissions and they usually process the records differently. Having one lambda handle both pipelines mean that:

  1. If you connect an SQS with API GW (i've seen people do it), now the return value of the parse function turns into a Union of list of records and a single item. seems quite awkward.
  2. Two types of error handling
  3. Multiple permissions/ subscription
  4. Very hard to differentiate between the pipelines in regards to total invocation time, memory usage etc when looking at cloudwatch metrics.
  5. Looking at specific pipeline logs gets complicated as the same log group now shares both log group. It can be sorted but you need to be conscious of it (adding an identifier to all logs such as 'source=sns' etc.).
  6. Testing is more complicated in regards to error handling.
  7. in many cases, different pipeliens also require transforming the data, not just getting it from the input, so now your lambda handles two responsibilities and handles two types of logic. Is it good? I'm not sure. I'd rather split it and "pay" for an extra function and deploy time.

Can it be done technically? yes Like @am29d said, it's already possible to achieve it with regular pydantic and unions but i dont think it's something i'd advocate for.

Should you? in my opinion, no but there's no 100% truth here - it's all about pros/cons but again, since it can achieved today with parser, i think it should be left alone.

heitorlessa commented 1 year ago

so where did we land here @leandrodamascena ?

athewsey commented 8 months ago

My $0.02: Some aspects of best-practice can be language-ecosystem-dependent, and rejecting this would be too opinionated when Python (maybe Pydantic less so...), TypeScript, etc have reasonable support for Union types.

What about environments where legacy and new stacks might need to co-exist? What about the value of a tested, re-usable function component that 'Just Works' whether the rest of the architecture invokes it direct, via SNS, or through SNS->SQS?

The use-case where I came across this was: trying to statically type and re-use a function for post-processing Amazon Textract results, regardless of whether it's connected to a Textract notification SNS topic, through a SQS queue to help manage concurrency, or called directly with a similar payload.

heitorlessa commented 8 months ago

Hey @athewsey, great to hear from you -- got an example pseudo code or not in mind?

@seshubaws is working on a RFC for the idea of a pipeline (similar to Apache Beam), allowing you to combine data sources indefinitely w/ generics -- on mobile now but she can share later.

abbasyadollahi commented 8 months ago

Wondering if there's still any plans to add this feature, I have a use case which could benefit from it (although possibly odd).

A bit of context on my use case, I have a Lambda connected to a DynamoDB stream, and on processing failure I manually push the DynamoDB record payload to a DLQ. After fixing the bug with the Lambda, I'd like to reprocess the DLQ messages, so I redrive them to a source queue, which the Lambda is also connected to. The problem is the payload for a DynamoDBStreamEvent isn't the same as an SQSEvent, and I don't want to create a second copy of the Lambda just to process a different event type. Ideally SQS would have something like raw message delivery, but alas.

DynamoDB Stream ---v
                 Lambda ---> DLQ
SQS ---------------^          |
^-----------redrive------------

What I'm currently doing is checking what the raw payload format is, and then wrapping it in the appropriate data class.

def handler(event: dict, context: LambdaContext) -> PartialItemFailureResponse:
    try:
        if event["Records"][0]["eventSource"] == "aws:sqs":
            # The body payload is a DynamoDB stream event
            event = json.loads(event["Records"][0]["body"])
    except Exception:
        pass

    return process(event, context)

@event_source(data_class=DynamoDBStreamEvent)
def process(event: DynamoDBStreamEvent, context: LambdaContext) -> PartialItemFailureResponse:
    ...

I was thinking in an ideal world something like the following could be possible with the help of powertools.


@event_source(data_classes=(DynamoDBStreamEvent, SQSEvent))
def handler(event: DynamoDBStreamEvent | SQSEvent, context: LambdaContext) -> PartialItemFailureResponse:
    if isinstance(event, DynamoDBStreamEvent):
        ...
    if isinstance(event, SQSEvent):
        ...
heitorlessa commented 8 months ago

hey @abbasyadollahi - adding @seshubaws and @rafaelgsr who are working on this