influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.82k stars 3.55k forks source link

Automatic downsampling and querying (rrd-like) #23108

Open christianTF opened 2 years ago

christianTF commented 2 years ago

The downsampling and retention in Influx is very administration intensive without generic functions. E.g., it is hard or impossible to do a post-downsampling of data of several years to 30 minute averages, because there are only downsampling mechanisms that run from 'now' minus x minutes.

The second issue is, that downsampling creates new measurement names and the queries need to explicitely specify the destination database to get high-accurate or downsampled data. There is no auto-select.

Coming from rrdtool, I'd like to request the following features:

This is in summary what Tobias Oetiker's rrdtool is doing, that would easify working with retention also in Influx.

lesam commented 2 years ago

Flux is getting pretty close to being able to express this:

downsample all data in all measurements of a database without need of explicitely name the measurements.

You can do this in flux, the from |> range |> filter query doesn't need to filter by measurement

Measurement names in the downsampled databases are equal to the original database with "mean" values.

You can name measurements in a new bucket in flux based on their name in an existing bucket. One difficulty is that it doesn't make sense to e.g. compute the mean of a string - the most recent flux includes a type check that should help to implement this kind of generic downsampling: https://docs.influxdata.com/flux/v0.x/stdlib/types/istype/

Downsampling can take place also afterwards. When data of 2 years are imported, and retention policy says 30min average, this also works for the 2 years of data, not only for the last 30 minutes

This gets a bit harder, but you can combine flux tasks running against new data with a batch job to downsample all the old data.

For the rest of your points, deciding to optimise queries automatically based on known downsampled version of the same data would indeed be great - though if you know you have downsampled data you can run hand-optimized queries today.

jagriffiths commented 2 years ago

Ive been looking at this recently and I am having similar issues with "historic 5 years of data" as of yet im unsure how to process this using batch queries

In terms of downsampling current "Live" data as it comes in I currently use a query similar to below to identify Numeric/Bool and String Data.

Aggregations are then applied to the Numeric data so I get a min/mean/mac/and count of values that have been considered in the window (writing out as float as there are issues if on initially gets written as an int and then later calculations try to write a float)

Bools and strings are dealt with differently.

This does have the downside that currently you need to specify in the filter ALL fields that are not numeric

This should be able to be changed very soon as the new istype function is already introduced in InfluxDB Cloud which allows you to detect the type prior to processing without directly specifying fields/measurments

According to https://community.influxdata.com/t/downsampling-task-fail-with-error-unsupported-input-type-for-mean-aggregate/23632/3 this is going to be released shortly into the OSS version (which I use) - fingers crossed


`import "experimental"

option task = {name: "01. downsample_all_1m", every: 1m0s}

fromBucket = "raw"
toBucket = "downsampled"

all_data = from(bucket: fromBucket)
    |> range(start: -task.every)
    |> filter(
        fn: (r) => r._measurement =~ /^cpu|disk|diskio|mem|processes|swap|system|internal_.+|net|netstat|procstat|procstat_lookup|smart_.+$/,
    )
numeric_data = all_data
    |> filter(
        fn: (r) => r._field !~ /^uptime_format|health_ok$/,
    )
bool_data = all_data
    |> filter(
        fn: (r) => r._field =~ /^health_ok$/,
    )

numeric_data
    |> aggregateWindow(every: task.every, fn: mean)
    |> set(key: "aggregate", value: "mean")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> map( fn: (r) => ({r with _value: float(v: r._value)}),  )
    |> to(bucket: toBucket)
numeric_data
    |> aggregateWindow(every: task.every, fn: min)
    |> set(key: "aggregate", value: "min")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> map( fn: (r) => ({r with _value: float(v: r._value)}),  )
    |> to(bucket: toBucket)
numeric_data
    |> aggregateWindow(every: task.every, fn: max)
    |> set(key: "aggregate", value: "max")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> map( fn: (r) => ({r with _value: float(v: r._value)}),  )
    |> to(bucket: toBucket)
numeric_data
    |> aggregateWindow(every: task.every, fn: sum)
    |> set(key: "aggregate", value: "sum")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> map( fn: (r) => ({r with _value: float(v: r._value)}),  )
    |> to(bucket: toBucket)
all_bools = bool_data
    |> aggregateWindow(every: task.every, fn: count)
true_bools = bool_data
    |> filter(
        fn: (r) => r._value == true,
    )
    |> aggregateWindow(every: task.every, fn: count)

experimental.join(
    left: all_bools,
    right: true_bools,
    fn: (left, right) => ({left with _value: if left._value == 0 then 0.0 else float(v: right._value) / float(v: left._value)}),
)
    |> set(key: "aggregate", value: "ratio")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> map( fn: (r) => ({r with _value: float(v: r._value)}),  )
    |> to(bucket: toBucket)

all_data
    |> aggregateWindow(every: task.every, fn: count)
    |> map(
        fn: (r) => ({r with _value: float(v: r._value)}),
    )
    |> set(key: "aggregate", value: "count")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> to(bucket: toBucket)
all_data
    |> aggregateWindow(every: task.every, fn: last)
    |> set(key: "aggregate", value: "last")
    |> set(key: "rollup_interval", value: string(v: task.every))
    |> to(bucket: toBucket)`

If any of thi could be Automatic and configurable from the gui that would be such a winner for me

jagriffiths commented 2 years ago

The thing Im struggling with is the amount of data being ingested/processed at any one time, I have downsample tasks similar to above which work fine for a small amount of hosts but if I scale this to thousands of hosts all of a sudden my influx cannot handle the amount of data and I get timeouts/crashes.

If I could somehow automatically run the query on each host individually through flux that would be a help but i'm currently unaware how this could be or if it could be done) - and even maybe upping this to loops for every host for every measurement i.e. cpu/mem so that there would be many small downsample tasks retrieving and processing very little data

ybinnenwegin2ip commented 2 years ago

The thing Im struggling with is the amount of data being ingested/processed at any one time, I have downsample tasks similar to above which work fine for a small amount of hosts but if I scale this to thousands of hosts all of a sudden my influx cannot handle the amount of data and I get timeouts/crashes.

I ran into the exact same issue: I have a years worth of data that causes the database server to OOM kill influxdb if I try to downsample it all at once, and I couldn't find a way to handle this in Flux (with that said, I didn't search for a solution very long, nor am I very experienced with Flux).

What I ended up doing instead is do this chunked querying in Python. I also posted this to the InfluxData Slack channel, I'll also mirror it here though:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

MAX = -525960  # 1 year

source_bucket = 'srt/autogen'

bucket_1m = 'srt/longterm_1m'

client = InfluxDBClient(url="http://localhost:8086", token="xxx", org="yyy", timeout=30000)

downsample_query = '''import "types"
from(bucket: "{source_bucket}")
    |> range(start: {rel_start}m, stop: {rel_stop}m)
    |> filter(
        fn: (r) =>
            types.isType(v: r._value, type: "int") or types.isType(v: r._value, type: "uint") or types.isType(
                    v: r._value,
                    type: "float",
                ),
    )
    |> aggregateWindow(every: {aggregate_interval}, fn: mean, createEmpty: false)
    |> to(bucket: "{destination_bucket}")
'''

query_api = client.query_api()

# We query Influxdb by relative times, we work from -5 minutes to -0 (now)
# we then decrease both of them by 5 minutes and do this until we reach 'MAX'
cur_start = -5
cur_stop = -0
while True:
    downsample_1m = downsample_query.format(
        source_bucket=source_bucket,
        rel_start=cur_start,
        rel_stop=cur_stop,
        aggregate_interval='1m',
        destination_bucket=bucket_1m
    )

    print(downsample_1m)
    query_api.query_raw(downsample_1m)

    if MAX > cur_start:
        break

    cur_start += -5
    cur_stop += -5

No need to tell me that this isn't pretty, but it does do the job for me (it still takes 10 seconds to process a chunk of 5 minutes worth of data, but that's a different issue altogether).

The general idea of the script above is not special at all, it runs a loop until the cur_start variable is lower than MAX. And every iteration it runs a Flux script to downsample a chunk of 5 minutes, save it to a different bucket and on to the next 5 minute chunk.

IMO there should be a better way to handle this kind of thing in InfluxDB. If it's not possible to build this into the product, then there should probably be some docs with recommended ways to handle this outside of InfluxDB with a script such as the one above.

There are plenty of cases where initially an infinite RP may sound reasonable, but after a while it's not that reasonable anymore. A way to properly 'recover' from such a scenario would be very nice.

kalinjul commented 2 years ago

Thank you very much for the python code! It's crazy that it's needed at all. I didn't understand the problem until i tried to downsample my data, influx just ate away all memory, even when just running a query on a 4-week chunk...