Closed zmoog closed 1 week ago
I can probably add cache expiration to avoid a stale region list:
import threading
import time
from functools import wraps
from typing import Any, Callable
def cache_for(seconds: int) -> Callable:
"""
Caches the result of a function for a specified number of seconds."""
def decorator(func: Callable) -> Callable:
lock = threading.Lock()
cache = {}
hits = misses = 0
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
nonlocal hits, misses
with lock:
key = str(args) + str(kwargs)
current_time = time.time()
if key in cache:
result, timestamp = cache[key]
if current_time - timestamp < seconds:
hits += 1
return result
misses += 1
result = func(*args, **kwargs)
cache[key] = (result, current_time)
return result
def cache_stats() -> dict:
"""
Returns the cache statistics.
:return: A dictionary containing the cache statistics"""
with lock:
return {'hits': hits, 'misses': misses}
wrapper.cache_stats = cache_stats
return wrapper
return decorator
@cache_for(seconds=60)
def describe_regions(all_regions: bool = True) -> Any:
"""
Fetches all regions from AWS and returns the response.
:return: The response from the describe_regions method
"""
return get_ec2_client().describe_regions(AllRegions=all_regions)
# Example usage with AWS regions
import boto3
def get_ec2_client():
return boto3.client('ec2')
# Example usage to access cache statistics
if __name__ == "__main__":
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 0, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 1, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 2, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 3, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 4, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 5, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 6, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_stats()) # {'hits': 7, 'misses': 1}
I get the following output:
{'hits': 0, 'misses': 1}
{'hits': 1, 'misses': 1}
{'hits': 2, 'misses': 1}
{'hits': 3, 'misses': 1}
{'hits': 4, 'misses': 1}
{'hits': 5, 'misses': 1}
{'hits': 6, 'misses': 1}
{'hits': 7, 'misses': 1}
Or, we can use a 3rd party library like https://cachetools.readthedocs.io/en/latest/
from typing import Any
from cachetools.func import ttl_cache
@ttl_cache(ttl=1800) # 30 minutes
def describe_regions(all_regions: bool = True) -> Any:
"""
Fetches all regions from AWS and returns the response.
:return: The response from the describe_regions method
"""
print("Fetching regions from AWS...")
return get_ec2_client().describe_regions(AllRegions=all_regions)
# Example usage with AWS regions
import boto3
def get_ec2_client():
return boto3.client('ec2')
# Example usage to access cache statistics
if __name__ == "__main__":
# print(dir(describe_regions))
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 0, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 1, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 2, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 3, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 4, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 5, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 6, 'misses': 1}
describe_regions(all_regions=False)
print(describe_regions.cache_info()) # {'hits': 7, 'misses': 1}
Output:
Fetching regions from AWS...
CacheInfo(hits=0, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=2, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=3, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=4, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=5, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=6, misses=1, maxsize=128, currsize=1)
CacheInfo(hits=7, misses=1, maxsize=128, currsize=1)
I wanted to see the details of what is happening exactly in our code and why this is an issue. I wrote this while I was testing it:
The problem comes from this line:
We need input_id
so we can know where to send the data (this is inside event_input
). This information was first available in the configuration provided by the user, stored in the ESF bucket, and was stored in ESF at the start upon parsing config.yaml
. This input_id
is unique and each input_id
should map to an Input that should be specified in the configuration file.
input_id
for each specific trigger?We have 4 possible triggers:
input_id
inside lambda_event
(this is the event that triggers ESF)lambda_event
lambda_event
I wanted to know what is inside in lambda_event
if it comes from a cloudwatch logs. I sent a message in a log stream to trigger it. This is the lambda_event
that my ESF got.
{
"awslogs":{
"data":"H4sIAAAAAAAA/42QPWvDMBRF/0p4swX6lqzNUDdTJ2croTjJqyuwJaOntJSQ/17c0L3LHS6cc+HeYEGiccLD94oQ4Kk7dG8v/TB0+x4ayF8JCwSw0klvleFCaWhgztO+5OsKAc45UR3TeWQXpHOJJ2QFp5gTMaR3VpHqAxhqwXGBAJQXZHOeGD2aBuh62tC1xpye41yxEITXf6mPv+7+E1PdmBvECwRQ3gijlNbKGa9VK6y2rd3Sc6tla6zx3nLluOReWNsaL7n0HhqocUGq47JCEE467p2SngvR/B0FAcaU6weW3ZynHW7TcD/efwDlkGd8SgEAAA=="
}
}
We decode the data
field, which looks like this:
{
"messageType":"DATA_MESSAGE",
"owner":"627286350134",
"logGroup":"constanca-describe-regions-esf-test",
"logStream":"some-log-stream",
"subscriptionFilters":[
"constanca-describe-regions-esf-test"
],
"logEvents":[
{
"id":"38515334437584391646961646806429565886037020816695820288",
"timestamp":1727087328011,
"message":"another log event"
}
]
}
In our config.yaml
file we need to provide the input_id
for ESF as the cloudwatch ARN, in this format: arn:aws:logs:{region}:{account-id}:log-group:{log_group_name}:*
or as arn:aws:logs:{region}:{account-id}:log-group:{log_group_name}:log-stream:{log-stream-name}
(see official documentation). So what do we have in this data
field that we can use?
region
- Noaccount-id
- Yes, field owner
log_group_name
- Yes, field logGroup
log-stream-name
- Yes, field logStream
So we are only missing region to obtain the input_id
so we can then get the output to send the data to.
input_id
for cloudwatch trigger then?Currently, we make the call EC2:DescribeRegion API every time an event from a cloudwatch logs group triggers ESF.
From this result, and for every RegionName
we do:
From my understanding, the region needs to be the same. So is there any reason we we would want to keep this API call @zmoog?
Thanks for the in-depth analysis.
I tested the cloudwatch lambda trigger on the AWS console and ESF. As of today, it seems cloudwatch lambda triggers can only work with log groups in the same region from as the lambda functions. For example, if I deploy ESF on eu-west-1
, I can only process log events from log groups on eu-west-1
.
Given this limit, there is no reason to keep calling the EC2:DescribeRegion API on every event.
I plan to remove this API call from ESF.
Here's my two-steps plan:
@lru_cache
decorator from the standard library to reduce the number of API calls from one every event to just one on start. This small risk change would allow us to ship a patch release today.WDYT?
I am fine with approving the PR as it is.
You need to change the version of ESF currently (I believe you need to update the changelog and version.py. After that the release workflow will be triggered, but if we push this change just like this, then nothing will happen.
You need to change the version of ESF currently (I believe you need to update the changelog and version.py. After that the release workflow will be triggered, but if we push this change just like this, then nothing will happen.
Thanks! On it.
I would say we should keep #723 open.
I agree!
I doubt we will have issues like this again with the cache, but let's see!
I'll work on removing the EC2:DescribeRegions API call later this week.
What does this PR do?
Caches EC2:DescribeRegion API calls response.
Why is it important?
On high-volume deployments, ESF can hit the EC2:DescribeRegions API requests limit, causing throttling errors like the following:
ESF needs the list of existing regions to parse incoming events from the
cloudwatch-logs
input. Since new AWS region additions do not happen frequently, picking up and caching the list of existing regions at function startup seems adequate.The list of existing AWS regions is available at https://aws.amazon.com/about-aws/global-infrastructure/regions_az/
Checklist
CHANGELOG.md