vimeo / graphite-influxdb

An influxdb backend for Graphite-web and graphite-api
Apache License 2.0
198 stars 39 forks source link

replace fix_datapoints with `fill(null)` in influx query #14

Closed Dieterbe closed 10 years ago

Dieterbe commented 10 years ago

once https://github.com/influxdb/influxdb/issues/426 is fixed, we can replace all the fix_datapoints logic and just query like

select time, mean(value) from foo group by time(60s) fill(null) where time > 1406230368s and time < 1406231207s

this will simplify the code and should be faster too.

Dieterbe commented 10 years ago

unfortunately, this makes things slower.

in the patch below, we get rid of fix_datapoints and fix_datapoints_multi and instead use "group by time (60s) fill(null)" the difference in timings is further down.

diff --git a/graphite_influxdb.py b/graphite_influxdb.py
index a71da7d..b4ccad0 100644
--- a/graphite_influxdb.py
+++ b/graphite_influxdb.py
@@ -94,96 +94,21 @@ class InfluxdbReader(object):
         # influx doesn't support <= and >= yet, hence the add.
         logger.debug(caller="fetch()", start_time=start_time, end_time=end_time, step=self.step, debug_key=self.path)
         with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.what=query_individual_duration'):
-            data = self.client.query('select time, value from "%s" where time > %ds '
-                                     'and time < %ds order asc' % (
-                                         self.path, start_time, end_time + 1))
+            data = self.client.query('select time, mean(value) from "%s" group by time(%ds) fill(null) '
+                                     'where time > %ds and time < %ds order asc' % (
+                                         self.path, self.step, start_time, end_time + 1))

         logger.debug(caller="fetch()", returned_data=data, debug_key=self.path)

         try:
-            known_points = data[0]['points']
+            datapoints = data[0]['points']
         except Exception:
             logger.debug(caller="fetch()", msg="COULDN'T READ POINTS. SETTING TO EMPTY LIST", debug_key=self.path)
-            known_points = []
-        logger.debug(caller="fetch()", msg="invoking fix_datapoints()", debug_key=self.path)
-        datapoints = InfluxdbReader.fix_datapoints(known_points, start_time, end_time, self.step, self.path)
+            datapoints = []

         time_info = start_time, end_time, self.step
         return time_info, datapoints

-    @staticmethod
-    def fix_datapoints_multi(data, start_time, end_time, step):
-        out = {}
-        """
-        data looks like:
-        [{u'columns': [u'time', u'sequence_number', u'value'],
-          u'name': u'stats.timers.dfvimeoplayproxy3.varnish.miss.410.count_ps',
-            u'points': [[1402928319, 1, 0.133333],
-            ....
-        """
-        for seriesdata in data:
-            logger.debug(caller="fix_datapoints_multi", msg="invoking fix_datapoints()", debug_key=seriesdata['name'])
-            datapoints = InfluxdbReader.fix_datapoints(seriesdata['points'], start_time, end_time, step, seriesdata['name'])
-            out[seriesdata['name']] = datapoints
-        return out
-
-    @staticmethod
-    def fix_datapoints(known_points, start_time, end_time, step, debug_key):
-        """
-        points is a list of known points (potentially empty)
-        """
-        logger.debug(caller='fix_datapoints', len_known_points=len(known_points), debug_key=debug_key)
-        if len(known_points) == 1:
-            logger.debug(caller='fix_datapoints', only_known_point=known_points[0], debug_key=debug_key)
-        elif len(known_points) > 1:
-            logger.debug(caller='fix_datapoints', first_known_point=known_points[0], debug_key=debug_key)
-            logger.debug(caller='fix_datapoints', last_known_point=known_points[-1], debug_key=debug_key)
-
-        datapoints = []
-        steps = int(round((end_time - start_time) * 1.0 / step))
-        # if we have 3 datapoints: at 0, at 60 and 120, then step is 60, steps = 2 and should have 3 points
-        # note that graphite assumes data at quantized intervals, whereas in influx they can be stored at like 07, 67, etc.
-        ratio = len(known_points) * 1.0 / (steps + 1)
-        statsd.timer('service=graphite-api.target_type=gauge.unit=none.what=known_points/needed_points', ratio)
-
-        if len(known_points) == steps + 1:
-            logger.debug(action="No steps missing", debug_key=debug_key)
-            datapoints = [p[2] for p in known_points]
-        else:
-            amount = steps + 1 - len(known_points)
-            logger.debug(action="Fill missing steps with None values", amount=amount, debug_key=debug_key)
-            next_point = 0
-            for s in range(0, steps + 1):
-                # if we have no more known points, fill with None's
-                # even ininitially when next_point = 0, len(known_points) might be == 0
-                if next_point >= len(known_points):
-                    datapoints.append(None)
-                    continue
-
-                # if points are not evenly spaced. i.e. they should be a minute apart but sometimes they are 55 or 65 seconds,
-                # and if they are all about step/2 away from the target timestamps, then sometimes a target point has 2 candidates, and
-                # sometimes 0. So a point might be more than step/2 older.  in that case, since points are sorted, we can just forward the pointer
-                # influxdb's fill(null) will make this cleaner and stop us from having to worry about this.
-
-                should_be_near = start_time + step * s
-                diff = known_points[next_point][0] - should_be_near
-                while next_point + 1 < len(known_points) and diff < (step / 2) * -1:
-                    next_point += 1
-                    diff = known_points[next_point][0] - should_be_near
-
-                # use this point if it's within step/2 from our target
-                if abs(diff) <= step / 2:
-                    datapoints.append(known_points[next_point][2])
-                    next_point += 1  # note: might go out of bounds, which we use as signal
-
-                else:
-                    datapoints.append(None)
-
-        logger.debug(caller='fix_datapoints', len_known_points=len(known_points), len_datapoints=len(datapoints), debug_key=debug_key)
-        logger.debug(caller='fix_datapoints', first_returned_point=datapoints[0], debug_key=debug_key)
-        logger.debug(caller='fix_datapoints', last_returned_point=datapoints[-1], debug_key=debug_key)
-        return datapoints
-
     def get_intervals(self):
             now = int(time.time())
             return IntervalSet([Interval(1, now)])
@@ -263,7 +188,7 @@ class InfluxdbFinder(object):
             for name in series:
                 if regex.match(name) is not None:
                     logger.debug("found leaf", name=name)
-                    res = 10
+                    res = 60
                     for (rule_patt, rule_res) in self.schemas:
                         if rule_patt.match(name):
                             res = rule_res
@@ -307,21 +232,19 @@ class InfluxdbFinder(object):
     def fetch_multi(self, nodes, start_time, end_time):
         series = ', '.join(['"%s"' % node.path for node in nodes])
         step = 60  # TODO: this is not ideal in all cases. for one thing, don't hardcode, for another.. how to deal with multiple steps?
-        query = 'select time, value from %s where time > %ds and time < %ds order asc' % (
-                series, start_time, end_time + 1)
+        query = 'select time, mean(value) from %s group by time (%ds) fill(null) where time > %ds and time < %ds order asc' % (
+                series, step, start_time, end_time + 1)
         logger.debug(caller='fetch_multi', query=query)
         logger.debug(caller='fetch_multi', start_time=print_time(start_time), end_time=print_time(end_time), step=step)
         with statsd.timer('service=graphite-api.ext_service=influxdb.target_type=gauge.unit=ms.action=select_datapoints'):
             data = self.client.query(query)
         logger.debug(caller='fetch_multi', returned_data=data)
+        datapoints = {}
+        for seriesdata in data:
+            datapoints[seriesdata['name']] = [p[1] for p in seriesdata['points']]
         if not len(data):
-            data = [{'name': node.path, 'points': []} for node in nodes]
-            logger.debug(caller='fetch_multi', FIXING_DATA_TO=data)
-        logger.debug(caller='fetch_multi', len_datapoints_before_fixing=len(data))
-
-        with statsd.timer('service=graphite-api.action=fix_datapoints_multi.target_type=gauge.unit=ms'):
-            logger.debug(caller='fetch_multi', action='invoking fix_datapoints_multi()')
-            datapoints = InfluxdbReader.fix_datapoints_multi(data, start_time, end_time, step)
+            for node in nodes:
+                datapoints[node.path] = []

         time_info = start_time, end_time, step
         return time_info, datapoints

the change is deployed around 6:14 this is for requesting a bunch of graphs that go 6hours back and with minutely resolution. especially the worst case select_datapoints (e.g. query influx) takes a hit of several seconds, whereas the worst case fix_datapoints was still <250ms. the 2nd graph is a zoom in of the first one.

timings timings-zoomed

also, for one graph (note to self: Requests on health page) the data was incorrect (rendered as square with peak @ 1)

Dieterbe commented 10 years ago

maybe @pauldix and friends can make group by time (%ds) fill (null) faster at some point, but for now we better stick with the python version :(