elastic / elastic-serverless-forwarder

Elastic Serverless Forwarder
Other
35 stars 36 forks source link

Place messages from misconfigured IDs in the replay queue #711

Closed constanca-m closed 4 months ago

constanca-m commented 5 months ago

What does this PR do?

Prints error message if the input id from the trigger is not present in config.yaml, and places the message in the replay queue.

Why is this PR important?

Issues linked explained it well, but to sum it up:

Checklist

How to test this PR locally

I added a how-to-test directory in the commits. Please follow the README.md file from that directory. Important: You need to use it with the code from this PR that is currently not merged, otherwise it will fail.

I also added some tests to check the undefined input that will run in one of the GitHub workflows.

Related issues

Tests and Results

SQS trigger

config.yaml looks like this:

"inputs":
- "id": "arn:aws:sqs:eu-west-2:627286350134:wrong-sqs"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.sqs-default"
    "type": "elasticsearch"
  "type": "sqs"

And ESF has aws_lambda_event_source_mapping configured correctly with the SQS ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines. ```log INIT_START Runtime Version: python:3.9.v51 Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5 { "@timestamp": "2024-05-07T10:27:19.154Z", "log.level": "info", "message": "Found credentials in environment variables.", "ecs": { "version": "1.6.0" }, "log": { "logger": "botocore.credentials", "origin": { "file": { "line": 1147, "name": "credentials.py" }, "function": "load" }, "original": "Found credentials in environment variables." }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } } } START RequestId: 85df0f38-3743-527e-8126-518c3377948d Version: $LATEST { "@timestamp": "2024-05-07T10:27:19.320Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 56, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } }, "type": "sqs" } { "@timestamp": "2024-05-07T10:27:19.320Z", "log.level": "info", "message": "config file", "bucket_name": "constanca-test-esf-config-bucket", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 204, "name": "utils.py" }, "function": "config_yaml_from_s3" }, "original": "config file" }, "object_key": "config.yaml", "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } } } { "@timestamp": "2024-05-07T10:27:19.818Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 338, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } }, "size": 1 } { "@timestamp": "2024-05-07T10:27:19.819Z", "log.level": "error", 1, <------------------ FIXED "message": "no input defined", "ecs": { "version": "1.6.0" }, "input_id": "arn:aws:sqs:eu-west-2:627286350134:constanca-esf-issue-696", "log": { "logger": "root", "origin": { "file": { "line": 428, "name": "handler.py" }, "function": "lambda_handler" }, "original": "no input defined" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } } } { "@timestamp": "2024-05-07T10:27:20.035Z", "log.level": "info", "message": "lambda processed all the events", "ecs": { "version": "1.6.0" }, "empty_events": 0, "error_events": 1, <------------------ NEW "log": { "logger": "root", "origin": { "file": { "line": 554, "name": "handler.py" }, "function": "lambda_handler" }, "original": "lambda processed all the events" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139802417399616, "name": "MainThread" } }, "sent_events": 0, "skipped_events": 0 } END RequestId: 85df0f38-3743-527e-8126-518c3377948d REPORT RequestId: 85df0f38-3743-527e-8126-518c3377948d Duration: 738.26 ms Billed Duration: 739 ms Memory Size: 128 MB Max Memory Used: 89 MB Init Duration: 707.29 ms ```

The replaying queue, constanca-test-esf-replay-queue, has Messages available: 1.

Cloudwatch logs trigger

config.yaml looks like this:

"inputs":
- "id": "arn:aws:logs:eu-west-2:627286350134:log-group:wrong-log-group:*"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.cloudwatchlogs-default"
    "type": "elasticsearch"
  "type": "cloudwatch-logs"

And ESF has aws_cloudwatch_log_subscription_filter configured correctly with the Cloudwatch Logs ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines. ```log INIT_START Runtime Version: python:3.9.v51 Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5 { "@timestamp": "2024-05-07T10:46:12.197Z", "log.level": "info", "message": "Found credentials in environment variables.", "ecs": { "version": "1.6.0" }, "log": { "logger": "botocore.credentials", "origin": { "file": { "line": 1147, "name": "credentials.py" }, "function": "load" }, "original": "Found credentials in environment variables." }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } } } START RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421 Version: $LATEST { "@timestamp": "2024-05-07T10:46:12.360Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 56, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } }, "type": "cloudwatch-logs" } { "@timestamp": "2024-05-07T10:46:12.360Z", "log.level": "info", "message": "config file", "bucket_name": "constanca-test-esf-config-bucket", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 204, "name": "utils.py" }, "function": "config_yaml_from_s3" }, "original": "config file" }, "object_key": "config.yaml", "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } } } { "@timestamp": "2024-05-07T10:46:12.830Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 141, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } }, "size": 1 } { "@timestamp": "2024-05-07T10:46:14.483Z", "log.level": "error", <--------------------- FIXED "message": "no input defined", "ecs": { "version": "1.6.0" }, "input_id": "arn:aws:logs:%AWS_REGION%:627286350134:log-group:constanca-test-esf-log-group:*", "log": { "logger": "root", "origin": { "file": { "line": 151, "name": "handler.py" }, "function": "lambda_handler" }, "original": "no input defined" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } } } { "@timestamp": "2024-05-07T10:46:14.698Z", "log.level": "info", "message": "lambda is going to shutdown", "ecs": { "version": "1.6.0" }, "empty_events": 0, "error_events": 1, <---------------------- NEW "log": { "logger": "root", "origin": { "file": { "line": 161, "name": "handler.py" }, "function": "lambda_handler" }, "original": "lambda is going to shutdown" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 140459008206656, "name": "MainThread" } }, "sent_events": 0, "skipped_events": 0 } END RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421 REPORT RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421 Duration: 2370.59 ms Billed Duration: 2371 ms Memory Size: 128 MB Max Memory Used: 107 MB Init Duration: 697.21 ms ```

Note: The input id in the error message is"input_id": "arn:aws:logs:%AWS_REGION%:627286350134:log-group:constanca-test-esf-log-group:*". We don't have access to the region name from the message, so I set it as %AWS_REGION%.

Kinesis data stream trigger

config.yaml:

"inputs":
- "id": "arn:aws:kinesis:eu-west-2:627286350134:stream/wrong-kinesis"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.kinesis-default"
    "type": "elasticsearch"
  "type": "kinesis-data-stream"

And configured aws_lambda_event_source_mapping with the correct Kinesis data stream ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines. ```log INIT_START Runtime Version: python:3.9.v51 Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5 { "@timestamp": "2024-05-07T12:01:56.950Z", "log.level": "info", "message": "Found credentials in environment variables.", "ecs": { "version": "1.6.0" }, "log": { "logger": "botocore.credentials", "origin": { "file": { "line": 1147, "name": "credentials.py" }, "function": "load" }, "original": "Found credentials in environment variables." }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } } } START RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0 Version: $LATEST { "@timestamp": "2024-05-07T12:01:57.117Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 56, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } }, "type": "kinesis-data-stream" } { "@timestamp": "2024-05-07T12:01:57.118Z", "log.level": "info", "message": "config file", "bucket_name": "constanca-test-esf-config-bucket", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 204, "name": "utils.py" }, "function": "config_yaml_from_s3" }, "original": "config file" }, "object_key": "config.yaml", "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } } } { "@timestamp": "2024-05-07T12:01:57.591Z", "log.level": "info", "message": "trigger", "ecs": { "version": "1.6.0" }, "log": { "logger": "root", "origin": { "file": { "line": 237, "name": "handler.py" }, "function": "lambda_handler" }, "original": "trigger" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } }, "size": 1 } { "@timestamp": "2024-05-07T12:01:57.592Z", "log.level": "error", <----------------------- FIXED "message": "no input defined", "ecs": { "version": "1.6.0" }, "input_id": "arn:aws:kinesis:eu-west-2:627286350134:stream/constanca-test-esf-kinesis", "log": { "logger": "root", "origin": { "file": { "line": 243, "name": "handler.py" }, "function": "lambda_handler" }, "original": "no input defined" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } } } { "@timestamp": "2024-05-07T12:01:57.825Z", "log.level": "info", "message": "lambda is going to shutdown", "ecs": { "version": "1.6.0" }, "empty_events": 0, "error_events": 1, <----------------------- NEW "log": { "logger": "root", "origin": { "file": { "line": 256, "name": "handler.py" }, "function": "lambda_handler" }, "original": "lambda is going to shutdown" }, "process": { "name": "MainProcess", "pid": 8, "thread": { "id": 139842871994176, "name": "MainThread" } }, "sent_events": 0, "skipped_events": 0 } END RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0 REPORT RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0 Duration: 734.63 ms Billed Duration: 735 ms Memory Size: 128 MB Max Memory Used: 89 MB Init Duration: 716.69 ms ```

Replay Queue

After all the three failed triggers, the replay queue has Messages available: 3.