aws / chalice

Python Serverless Microframework for AWS
Apache License 2.0
10.66k stars 1.01k forks source link

Support for maximum concurrency on SQS triggered functions #2031

Open ivandjuricic opened 1 year ago

ivandjuricic commented 1 year ago

In January 2023. AWS supported adding maximum concurrency on SQS lambda trigger. Details can be found here https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/ tldr: when SQS trigger has maximum concurrency set it will hold back messages from visibility after the max lambda concurrency specified is reached thus avoiding possible known DLQ behaviour.

Chalice supporting this would be really beneficial for a lot of flows with queues that are processing large batches of SQS messages on on_sqs_message decorator

eg.

@app.on_sqs_message(queue='my-queue', batch_size=1, max_concurrency=20)
def handle_sqs_message(event):
dorukhan-ti commented 1 year ago

This would be a really useful option to a project I am currently working on

dorukhan-ti commented 1 year ago

This would be a really useful option to a project I am currently working on

https://github.com/aws/chalice/issues/2029#issuecomment-1499833574

Ecitperbo commented 9 months ago

I faced the same issue. Here is a workaround if it can help someone.

import shutil
import json

CHALICE_TF_PATH = 'chalice.tf.json'

# Chalice does not currently support maximum_concurrency and function_response_types
# in the annotations that generate the event source mapping.
# * maximum_concurrency is set to lambda's reserved_concurrent_executions
#   (see https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/)
# * function_response_types is set to ReportBatchItemFailures when batch_size is superior than 1.

shutil.copy2(CHALICE_TF_PATH, f'{CHALICE_TF_PATH}.before_tweak')

with open(CHALICE_TF_PATH) as f_input:
    chalice_tf: dict = json.load(f_input)
    lambda_resources: dict[str, dict] = chalice_tf['resource'].get('aws_lambda_function', {})
    esm_resources: dict[str, dict] = chalice_tf['resource']['aws_lambda_event_source_mapping']

    for esm_name, esm_resource in esm_resources.items():
        print(f'tweak {esm_name} in {CHALICE_TF_PATH}')

        lambda_resource: dict = lambda_resources.get(esm_name.split('-')[0], {})
        reserved_concurrent_executions: int = lambda_resource.get('reserved_concurrent_executions', 0)
        if reserved_concurrent_executions:
            esm_resource['scaling_config'] = {'maximum_concurrency': reserved_concurrent_executions}

        if esm_resource.get('batch_size', 0) > 1:
            esm_resource['function_response_types'] = ['ReportBatchItemFailures']

with open(CHALICE_TF_PATH, 'w') as f_output:
    json.dump(chalice_tf, f_output, indent=2)

print(f'{CHALICE_TF_PATH} successfully tweaked')
henryivesjones commented 6 months ago

This is one of a few features on the fork of chalice we are using. https://pypi.org/project/chalice-nuclei-ai/