graphite-project / carbon

Carbon is one of the components of Graphite, and is responsible for receiving metrics over the network and writing them down to disk using a storage backend.
http://graphite.readthedocs.org/
Apache License 2.0
1.5k stars 490 forks source link

Create new metrics earlier #888

Open piotr1212 opened 4 years ago

piotr1212 commented 4 years ago

In large Graphite installations the queue's can get really long. It can take an hour for Graphite to write all metrics in queue. New db files are created when the metric is written, which can take too long. This separates the creation of metrics from writing data to them and moves the creation to an earlier moment.

Whenever a new metric is received it's name is pushed to a new_metric list. The first step in the writer loop is to check if there are new metrics received and creates them if they don't exist on disk yet. After the creation the writer continues as usual with writing metrics from the queue but it does not check if the file already exists, to prevent that the check occurs twice and has impact on IO. If the file does not exists at thie point it is logged.

Fixes: https://github.com/graphite-project/graphite-web/issues/629

piotr1212 commented 4 years ago

Not sure if I missed something but I think this would fix the issues in https://github.com/graphite-project/graphite-web/issues/629 but a much simpler solution than everything discussed.

I see I broke Python2, not sure if we still care?

ploxiln commented 4 years ago

Some more thoughts, hopefully helpful:

Currently (without this pull-request), state.database.exists(metric) is called for each metric as it is removed from the cache by cache.drain_metric(). When more datapoints for the metric are reported, it will be added to the cache again, and the exists() check will run again when it is removed.

If you instead called state.database.exists(metric) for each metric when it is first added to the cache, that would result in the same overall number of exists() checks. Each metric is added, remains in cache for some time, then is removed (and written). Adds and removes are symmetric, in total count. But they would be distributed in time differently (may or may not be more bursty).

Regarding the file-create-rate-limit: with the status-quo (check-on-remove), when the limit is hit, all cached data for the affected metric is lost. With the alternative (create-on-add), you can skip adding the new metric if the file-create-rate-limit is hit, and only lose the last bit of data submitted, and cause it to check again when the next bit of data is submitted. This loses as little data as possible, while honoring the file-create-rate-limit, but doing more exists() checks (maybe a lot more if the limit is hit a lot). I think exists() checks should be relatively cheap.

Finally, if the whisper file is deleted while the metric is already in the cache (by the user or some separate cleanup job), then in the create-on-add design, you lose all the cached data when it later fails to write it out. This seems like a very unlikely corner case.

So I think it could be a good idea to move the exists() check and create attempt, to just when the metric is being added to the cache. Or, instead of just one exists()/create before each drain_metric(), do all available new_metrics in a loop, before moving on to drain_metric(), and if new-file-create fails, remove that metric from the cache so it doesn't collect more before being lost.

piotr1212 commented 4 years ago

Yes, basically what I was thinking. Already have a patch to loop through all new_metrics before moving on to drain_metric().

piotr1212 commented 4 years ago

last commit needs testing. I'm quite busy at the moment, don't know when I will get around to it.

lgtm-com[bot] commented 4 years ago

This pull request introduces 1 alert when merging b9dbd1b461d8e0e09bde847e7f24d7a31e01dd16 into 42c124251e7036bc01ab307c0ac1fb2016a092ab - view on LGTM.com

new alerts:

ploxiln commented 4 years ago

I have a more minimal version of this idea, with a couple of tweaks, possibly useful:

--- a/lib/carbon/cache.py
+++ b/lib/carbon/cache.py
@@ -189,6 +189,7 @@ class _MetricCache(defaultdict):
   def __init__(self, strategy=None):
     self.lock = threading.Lock()
     self.size = 0
+    self.new_metrics = []
     self.strategy = None
     if strategy:
       self.strategy = strategy(self)
@@ -253,6 +254,8 @@ class _MetricCache(defaultdict):
           log.msg("MetricCache is full: self.size=%d" % self.size)
           events.cacheFull()
         else:
+          if not self[metric]:
+            self.new_metrics.append(metric)
           self.size += 1
           self[metric][timestamp] = value
           if self.strategy:
diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py
index 7b63cba..f46421b 100644
--- a/lib/carbon/writer.py
+++ b/lib/carbon/writer.py
@@ -95,24 +95,16 @@ def writeCachedDataPoints():

   cache = MetricCache()
   while cache:
-    (metric, datapoints) = cache.drain_metric()
-    if metric is None:
-      # end the loop
-      break
-
-    dbFileExists = state.database.exists(metric)
-
-    if not dbFileExists:
-      if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
-        # If our tokenbucket doesn't have enough tokens available to create a new metric
-        # file then we'll just drop the metric on the ground and move on to the next
-        # metric.
-        # XXX This behavior should probably be configurable to no tdrop metrics
-        # when rate limitng unless our cache is too big or some other legit
-        # reason.
-        instrumentation.increment('droppedCreates')
+    # first create new metrics files so graphite-web knows they exist
+    while cache.new_metrics:
+      metric = cache.new_metrics.pop()
+      if state.database.exists(metric):
         continue

+      if CREATE_BUCKET:
+        CREATE_BUCKET.drain(1, blocking=True)
+        # log waitTime ?
+
       archiveConfig = None
       xFilesFactor, aggregationMethod = None, None

@@ -149,6 +141,12 @@ def writeCachedDataPoints():
         instrumentation.increment('errors')
         continue

+    # now drain and persist some data
+    (metric, datapoints) = cache.drain_metric()
+    if metric is None:
+      # end the loop
+      break
+
     # If we've got a rate limit configured lets makes sure we enforce it
     waitTime = 0
     if UPDATE_BUCKET:
piotr1212 commented 4 years ago

yeah, basically that but don't block on the create_bucket limiter, just continue with writing datapoints. We don't want to hang the whole writer.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.