DataDog / datadog-lambda-python

The Datadog AWS Lambda Layer for Python
https://docs.datadoghq.com/integrations/amazon_lambda/#installing-and-using-the-datadog-layer
Apache License 2.0
84 stars 45 forks source link

Lossiness submitting timestamped custom metrics #514

Open jodylent opened 3 hours ago

jodylent commented 3 hours ago

Expected Behavior

Repeated calls to lambda_metric should result in metric emission for the entire lifespand of an execution context

Actual Behavior

We are seeing lost metric points when submitting explicitly timestamped custom metrics via the Datadog Lambda Layer for Python.

This behavior is reproducible with a simple Lambda Function:

@datadog_lambda_wrapper
def main(event, context, *args, **kwargs):

    now = int(time.time())
    lambda_metric("jltest.foo", 1, timestamp=now)
    lambda_metric("jltest.foo", 1, timestamp=now)
    lambda_metric("jltest.foo", 1, timestamp=now)
    return now

Invoking the above Function a dozen times in succession, results in a dozen invocations to a single execution context, and thus a single instance of ThreadStatsWriter

The resulting metrics show several jltest.foo emissions, followed by several "lost" metrics, despite continued invocation metrics from the DD Extension.


DD Notebook Screenshot (collapsed) ![screenshot](https://github.com/user-attachments/assets/ec93242c-32a5-4887-8bbb-56900db953f9)

There is a corresponding notebook for the above screenshot, linked in a corresponding support ticket # 1857092 -- feel free to DM for additional details


Anecdotally, only 33 distribution points are emitted from the above code, though one would expect 3 * ${num invocations}, whether the Function is invoked a dozen times, twenty times, or even more.

Forcing the creation of a new execution context (e.g. by making a whitespace code change, or updating an env var) results in a NEW instance of ThreadStatsWriter, which emits the same number of metric points, before it too begins "flushing" unsent metrics.

Specifications

Stacktrace

N/A -- DD logs seem "normal"

Detail / Steps to Reproduce the Problem

The Function used to reproduce the problem has the following handler and DD-provided Layers:

import time

from datadog_lambda.metric import lambda_metric
from datadog_lambda.wrapper import datadog_lambda_wrapper

@datadog_lambda_wrapper
def main(event, context, *args, **kwargs):
    """
    # Layer provides DD Python Libs
    arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Python310:98

    # Extension provides serverless agent on UDP 8125
    arn:aws:lambda:us-east-1:464622532012:layer:Datadog-Extension:65
    """

    now = int(time.time())
    lambda_metric("jltest.foo", 1, timestamp=now)
    lambda_metric("jltest.foo", 1, timestamp=now)
    lambda_metric("jltest.foo", 1, timestamp=now)
    return now

Code Paths

The DD Layer's lambda_metric function normally emits metrics to a StatsDWriter (source) using the serverless extension.

This works as expected, when those metrics are not timestamped.

Providing an explicit timestamp (in epoch seconds) causes the DD Layer to execute this code path (source) introduced in #480

def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
    ...

    if should_use_extension and timestamp is not None:
        # The extension does not support timestamps for distributions so we create a
        # a thread stats writer to submit metrics with timestamps to the API
        ...
        global extension_thread_stats
        if extension_thread_stats is None:
            ...
            extension_thread_stats = ThreadStatsWriter(flush_in_thread)  # flush_in_thread = False

        extension_thread_stats.distribution(
            metric_name, value, tags=tags, timestamp=timestamp
        )
        return

Metric points submitted to this extension_thread_stats instance of ThreadStatsWriter work fine for the first ~ dozen Function invocations, after which they simply fail to appear in the Metrics API.

Full call path - emission (collapsed) ```py # ==== CALL CHAIN: emitting a Distribution (not flushing) ============================ # # My Function lambda_metric(..., timestamp=) # https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/metric.py#L62 if should_use_extension and timestamp is not None: # The extension does not support timestamps for distributions so we create a # a thread stats writer to submit metrics with timestamps to the API ... global extension_thread_stats if extension_thread_stats is None: ... extension_thread_stats = ThreadStatsWriter(flush_in_thread) # False extension_thread_stats.distribution( metric_name, value, tags=tags, timestamp=timestamp ) return # https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/thread_stats_writer.py#L21 class ThreadStatsWriter(StatsWriter): def distribution(self, metric_name, value, tags=[], timestamp=None): self.thread_stats.distribution( metric_name, value, tags=tags, timestamp=timestamp ) # https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/base.py#L284 class ThreadStats(object): def distribution(self, metric_name, value, timestamp=None, tags=None, sample_rate=1, host=None): # L145: self._metric_aggregator = MetricAggregator(self.roll_up_interval) # roll up = 10 if not self._disabled: self._metric_aggregator.add_point( metric_name, tags, timestamp or time(), value, Distribution, sample_rate=sample_rate, host=host ) # https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/metrics.py#L183 class MetricsAggregator(object): def __init__(self, roll_up_interval=10): # this default will be used in our scenario self._lock = threading.RLock() self._metrics = defaultdict(lambda: {}) self._roll_up_interval = roll_up_interval def add_point(self, metric, tags, timestamp, value, metric_class, sample_rate=1, host=None): # The sample rate is currently ignored for in process stuff interval = timestamp - timestamp % self._roll_up_interval key = (metric, host, tuple(sorted(tags)) if tags else None) with self._lock: if key not in self._metrics[interval]: self._metrics[interval][key] = metric_class(metric, tags, host) self._metrics[interval][key].add_point(value) # https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/metrics.py#L97 class Distribution(Metric): """ A distribution metric. """ stats_tag = "d" def __init__(self, name, tags, host): self.name = name self.tags = tags self.host = host self.value = [] def add_point(self, value): self.value.append(value) ```
Full call path - flushing (collapsed) ```py # ==== CALL CHAIN: flushing a Distribution (not emitting) ============================ # # My Function @datadog_lambda_wrapper def main(event, context, *args, **kwargs): ... # https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/wrapper.py#L236 class _LambdaDecorator(object): def __call__(self, event, context, **kwargs): """Executes when the wrapped function gets called""" self._before(event, context) try: self.response = self.func(event, context, **kwargs) return self.response except Exception: submit_errors_metric(context) if self.span: self.span.set_traceback() raise finally: self._after(event, context) <<<<<< HERE def _after(self, event, context): try: ... if not self.flush_to_log or should_use_extension: flush_stats(context) ... datadog_lambda_wrapper = _LambdaDecorator # https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/metric.py#L122 # TOF lambda_stats = None extension_thread_stats = None flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" if should_use_extension: lambda_stats = StatsDWriter() else: lambda_stats = ThreadStatsWriter(flush_in_thread) # extension_thread_stats will be a NEW instance of extension_thread_stats = ThreadStatsWriter(flush_in_thread) # after entering the `if should_use_extension and timestamp is not None` conditional in `lambda_metric` ... def flush_stats(lambda_context=None): lambda_stats.flush() # ThreadStatsWriter.flush() if extension_thread_stats is not None: extension_thread_stats.flush(tags) # ThreadStatsWriter.flush() # https://github.com/DataDog/datadog-lambda-python/blob/master/datadog_lambda/thread_stats_writer.py#L25 class ThreadStatsWriter(StatsWriter): def flush(self, tags=None): _, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) # We DO see this line in the logs when DD_LOG_LEVEL == DEBUG, even for "unsent" MetricPoints self.thread_stats.flush_count += 1 logger.debug("Flush #%s sending %s distributions", self.thread_stats.flush_count, count_dists) try: self.thread_stats.reporter.flush_distributions(dists) except Exception as e: ... # https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/base.py#L151 class ThreadStats(object): # The point here is that self.thread_stats.reporter == HttpReporter(compress_payload=self.compress_payload) def start(flush_in_thread=True, flush_in_greenlet=False): # The reporter is responsible for sending metrics off to their final destination. # It's abstracted to support easy unit testing and in the near future, forwarding # to the datadog agent. self.reporter = HttpReporter(compress_payload=self.compress_payload) self.flush_count = 0 # We never see this line, and we DO see log lines about flushing if self._disabled: log.info("ThreadStats instance is disabled. No metrics will flush.") else: # THIS may be our next place to dig, if we're creating one on the fly # flush_in_greenlet defaults to false # flush_in_thread is explicitly false, back up the chain if flush_in_greenlet: self._start_flush_greenlet() elif flush_in_thread: self._start_flush_thread() # Flush all remaining metrics on exit atexit.register(lambda: self.flush(float("inf"))) # https://github.com/DataDog/datadogpy/blob/master/datadog/threadstats/reporters.py#L17 class HttpReporter(Reporter): def __init__(self, compress_payload=False): self.compress_payload = compress_payload def flush_distributions(self, distributions): api.Distribution.send(distributions, compress_payload=self.compress_payload) # We DO see the log lines for this INFO [2024-09-17 18:27:35.977] [17, 140623641798464] datadog.api 202 POST https://api.datadoghq.com/api/v1/distribution_points (156.9059ms) INFO [2024-09-17 18:27:41.338] [17, 140623641798464] datadog.api 202 POST https://api.datadoghq.com/api/v1/distribution_points (38.5311ms) ... # Even for metric points which DON'T end up in DD ```

Logging

After enabling debug logging inside both the Extension and the Layer (by setting env var DD_LOG_LEVEL="DEBUG")

DylanLovesCoffee commented 3 hours ago

hey @jodylent, really appreciate the detail in your investigation 🙇 We'll take a look into this and follow up with more information