jordaneremieff / mangum

AWS Lambda support for ASGI applications
https://mangum.io/
MIT License
1.67k stars 119 forks source link

SQS message not returned back to the topic #306

Open jaekunchoi opened 11 months ago

jaekunchoi commented 11 months ago

I have a below code that is a custom lambda handler

import json

from mangum.handlers.utils import (
    handle_base64_response_body,
    handle_exclude_headers,
    handle_multi_value_headers,
    maybe_encode_body,
)
from mangum.types import LambdaConfig, LambdaContext, LambdaEvent, Response, Scope

class SqsHandler:
    """This handler is responsible for reading and processing SQS events
    that have triggered the Lambda function.
    """

    def __init__(self, event: LambdaEvent, context: LambdaContext, config: LambdaConfig) -> None:
        self.event = event
        self.context = context
        self.config = config

    @classmethod
    def infer(cls, event: LambdaEvent, context: LambdaContext, config: LambdaConfig) -> bool:
        """How to distinguish SQS events from other AWS Lambda triggers"""

        return (
            "Records" in event 
            and len(event["Records"]) > 0 
            and event["Records"][0]["eventSource"] == "aws:sqs"
        )

    @property
    def body(self) -> bytes:
        """The body of the actual REST request we want to send after getting the event."""

        message_body = self.event["Records"][0]["body"]
        request_body = json.dumps({"data": message_body, "service": "sqs"})

        return maybe_encode_body(request_body, is_base64=False)

    @property
    def scope(self) -> Scope:
        """A mapping of expected keys that Mangum adapter uses under the hood"""

        headers = [{"Content-Type": "application/json"}]
        scope: Scope = {
            "type": "http",
            "http_version": "1.1",
            "method": "POST",
            "headers": [[k.encode(), v.encode()] for k, v in headers[0].items()],
            "scheme": "https",
            "path": "/content-analysis/process",
            "query_string": "",
            "raw_path": None,
            "root_path": "",
            "server": ("mangum", 80),
            "client": ("", 0),
            "asgi": {"version": "3.0", "spec_version": "2.0"},
            "aws.event": self.event,
            "aws.context": self.context,
        }
        return scope

    def __call__(self, response: Response) -> dict:
        finalized_headers, multi_value_headers = handle_multi_value_headers(response["headers"])
        finalized_body, is_base64_encoded = handle_base64_response_body(
            response["body"], finalized_headers, self.config["text_mime_types"]
        )

        return {
            "statusCode": response["status"],
            "headers": handle_exclude_headers(finalized_headers, self.config),
            "multiValueHeaders": handle_exclude_headers(multi_value_headers, self.config),
            "body": finalized_body,
            "isBase64Encoded": is_base64_encoded,
        }

and above is registered as custom handler with below code

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8008)
else:
    handler = Mangum(app, custom_handlers=[SqsHandler])

I also have SQS triggering Lambda function. When message is received from Lambda, it doesn't however put the message back into the queue. It therefore does not even retry.

Is there a reason why this is happening?

pkit commented 10 months ago

"Returning" from SQS doesn't "put message back" that's not how SQL-Lambda integration works. To re-queue message you will need to call boto3 with the specific SQS handling you want. return from lambda is only indication to SQS that message was processed successfully and needs to be removed from the queue. So all the data you return there is ignored.