influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
769 stars 153 forks source link

Add aggregate function #776

Open odd opened 6 years ago

odd commented 6 years ago

Some exploration:

// alternative possible calling syntax
aggregate(every: 10m, fns: {max: (r) => max(), min: (r) => min(), mean: (r) => mean()})
aggregate(every: 10m, fns: {max, min, mean, myMean})
aggregate(every: 10m, fns: [max, min, mean, myMean])
aggregate(every: 10m, fns: (r) => ({max: max(), min: min(), mean: mean()}))

// implementation
aggregate = (table=<-, every, fns) => {
    tables = fns
        |> map(fn: (name, fn) =>
            table
                |> window(every: every)
                |> fn()
        )
    merge = (tables) => fns
        |> map(fn: (name, fn) => tables[name]._value)
    join(tables: tables, on: ["_time"], merge))
}
nathanielc commented 6 years ago

Adding some more details:

Another similar implementation of aggregate helper function

aggregate = (aggregates, table=<-, column="_value") =>  {
    tables = aggregates |> map(fn: (k,v) => {
        return { [k]: table |> v(columns:[column]) }
    })
    return join(tables: tables, except: [column])
}

Do we want to add windowing to these helper functions as well?

This issue will depend on these being complete:

https://github.com/influxdata/platform/issues/212 - object literals https://github.com/influxdata/platform/issues/534 - remove time handling from aggregates https://github.com/influxdata/platform/issues/249 - type inference https://github.com/influxdata/platform/issues/535 - map over objects

nathanielc commented 6 years ago

Adding example of how to call aggregate:


from(db:"foo")
    |> range(...)
    |> filter(...)
    // Produces a table with columns sum_value, mean_value, and count_value
    |> aggregate(aggregates:{sum,mean,count})
pauldix commented 6 years ago

Might make sense to have this split up into two functions apply and applyWindow. The first would be defined like: apply = (fns, columns= ["_value"], <-tables) and the other would be something like:

applyWindow = (fns, columns=["_value"], every=0s, period=0s, <-tables) =>
    tables |> window(every:every, period:period) |> apply(fns, columns) |> window(every:inf)

Calling it aggregates is a little off because it's really about applying any functions to those tables.

russorat commented 4 years ago

@pauldix @nathanielc does aggregateWindow accomplish this?

nathanielc commented 4 years ago

No the request and design here goes beyond what aggregateWindow can offer.

adrian-thurston commented 4 years ago

@nathanielc Just for my understanding, this is about applying multiple aggregate functions in one pass over windowed data? IE if expressed in SQL:

select min(c1), max(c2) from table group by c3;