Closed sanderson closed 4 months ago
This sounds close to something I am interested in. I would like to know the duration of the state. | _start | _stop | state |
---|---|---|---|
2021-01-01T00:00:00Z | 2021-01-01T00:01:00Z | "crit" | |
2021-01-01T00:02:00Z | 2021-01-01T00:04:00Z | "warn" | |
2021-01-01T00:05:00Z | 2021-01-01T00:06:00Z | "ok" |
The current stateDuration()
doesn't seem like it will work, because I don't want to specify a state value. I want the duration of any state change. Also I don't want to use this as a monitor feature, I just need to query state change to display on screen.
Example use cases: 1) An sensor monitors a door magnet sensor. It only sends "open" when the door opens and "close" when the door closes. I want to query how long each state change was during the day. 2) A motion detection sensor (lets say a dog collar) sends a state of "moving" or "still" every 1 minute. I want to query for the day how the state changes happened (e.g the dog was lying still till 9am, actively moving till 12, probably took a nap till 1pm, etc)
Looking at the source code for stateChangesOnly in https://github.com/influxdata/flux/blob/master/stdlib/influxdata/influxdb/monitor/monitor.flux gives away that the current function has no magic code in it, it is just pure flux code which means it is possible to use the same method on your own (though very cumbersome).
For numeric values:
from(bucket: "<your-bucket>")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "<your-measurement>")
|> filter(fn: (r) => r["_field"] == "<your-field>")
|> duplicate(column: "_value", as: "____difference____")
|> sort(columns: ["_source_timestamp", "_time"], desc: false)
|> difference(columns: ["____difference____"])
|> filter(fn: (r) => r.____difference____ != 0)
|> drop(columns: ["____difference____"])
For string values it is limited to a limited set of strings you know in advance as you have to recode the strings into numbers, look at the code for stateChangesOnly:
stateChangesOnly = (tables=<-) => {
return tables
|> map(
fn: (r) => ({r with
level_value: if r._level == levelCrit then
4
else if r._level == levelWarn then
3
else if r._level == levelInfo then
2
else if r._level == levelOK then
1
else
0,
}),
)
|> duplicate(column: "_level", as: "____temp_level____")
|> drop(columns: ["_level"])
|> rename(columns: {"____temp_level____": "_level"})
|> sort(columns: ["_source_timestamp", "_time"], desc: false)
|> difference(columns: ["level_value"])
|> filter(fn: (r) => r.level_value != 0)
|> drop(columns: ["level_value"])
|> experimental.group(mode: "extend", columns: ["_level"])
}
@trojanc Once the step above is done it might be possible to get the length of each state change by copying the timestamps and using difference() on the timestamps similar to the steps above.
I need this! :+1: Sadly, @nathanielc closed the only PR for this.
The best workaround I found is based on the map
as seen in the comment above, but then uses difference
and events.duration
:
from(bucket: "eta")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "Holz_Kessel_Status")
// https://github.com/influxdata/flux/issues/3582#issuecomment-929864211
|> map(
fn: (r) => ({r with level_value:
if r._value == "Bereit" then 0
else if r._value == "Heizen" then 1
else if r._value == "Startvorgang" then 2
else if r._value == "Glutabbrand" then 3
else if r._value == "TWIN Betrieb:" then 4
else -1,
}),
)
// Find rows where the status changes
|> duplicate(column: "level_value", as: "diff")
|> difference(columns: ["diff"])
|> filter(fn: (r) => r.diff != 0)
|> drop(columns: ["diff"])
// calculate duration of the states
|> events.duration(
unit: 1s,
)
|> drop(columns: ["diff","_stop"])
Please don't burn me :fire:, but I managed to get this to work without hardcoding values: I fetch distinct values in a different query and hacked my way around flux's type restrictions (as far as I understand them)
import "strings"
import "json"
import "contrib/tomhollingworth/events"
states = from(bucket: "eta")
|> range(start: -1y)
|> filter(fn: (r) => r["_measurement"] == "Holz_Kessel_Status")
|> distinct()
|> findColumn(
fn: (key) => true,
column: "_value"
)
statesJSON = string(v: json.encode(v: states))
from(bucket: "eta")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "Holz_Kessel_Status")
// https://github.com/influxdata/flux/issues/3582#issuecomment-929864211
|> map(
fn: (r) => ({r with level_value:
strings.index(v: statesJSON, substr: "\"" + r._value + "\"")
}),
)
// Find rows where the status index changes
|> duplicate(column: "level_value", as: "diff")
|> difference(columns: ["diff"])
|> filter(fn: (r) => r.diff != 0)
|> drop(columns: ["diff", "level_value"])
// calculate duration of the states
|> events.duration(
unit: 1s,
)
handle with care. :see_no_evil:
Please don't burn me 🔥, but I managed to get this to work without hardcoding values: I fetch distinct values in a different query and hacked my way around flux's type restrictions (as far as I understand them)
Neat and ugly at the same time! ;) Really wish this was in the flux library.
Can confirm it works.
I needed this (minus the duration calculation) for this type of graph (easily gets overloaded when sending complete data sets rather than only changes): https://grafana.com/docs/grafana/latest/visualizations/state-timeline/
@tennox @danhallin Thanks for the interest in this issue and for sharing your workarounds. This is clearly a first class feature Flux needs. We have tried to tackle this issue in the past and didn't arrive at any good solutions, hence the closed PR, however I have re-added this issue to our queue and we will take another pass at it soon.
@nathanielc not to pile on here but consider me also a big supporter of this. I've spoken to several users who need this.
Side note, if https://github.com/influxdata/flux/pull/4387 gets merged, creating a generic state changes could be done with reduce(..., emitAll: true)
. There's an example in that PR thread.
Wrote up #4671 to track work specifically related to enabling this functionality.
There's potentially two separately useful functions here in Tom's awesome work:
1) An enumeration function. This in and of itself does most of the hard work.
2) The actual state change itself, without the automated duration calculation (i needed the start and end dates), which I took a slightly different approach to, to get around some issues of incomplete or halfway-states:
from(bucket: "stuff") |> range(start: -3y) |> filter(fn: (r) => r._measurement == "measurement") |> map( fn: (r) => ({ r with "_level": if r._value ==1 then 1 else 0, }), ) |> difference() |> elapsed(unit: 1ms) |> filter(fn: (r) => r._value != 0)
What's different about this approach is it will give you a recognisably incomplete state at the end. This then generates a 1 for every start of a state and -1 for a confirmed end of a state, and keeps this info in the table so if the state didn't end (because there was no change back), then that state should expect to be updated.
The if value==1 line is actually for custom user conditions around the underlying data, e.g. level > 700 or level > 50 and level < 100), turning this into a standard pass/fail.
Thus as new data streams in, we know the final state hasn't finished (as there's no corresponding -1 record), and needs to be amended.
I'm taking the elapsed time here because I need to take the last datetime it was at the recorded state, not the time it hits the new one, so I use that to subtract the previous time stamp, and the dates are kept and not changed to duration as I will need that to combine and validate individual results.
This issue has had no recent activity and will be closed soon.
Well does a dummy comment count as meaningful activity? 🤓
(PS: I still need this)
This issue has had no recent activity and will be closed soon.
Still valid. (Well, to the extent that Flux is still valid.)
This issue has had no recent activity and will be closed soon.
Still in favor of this function though, so a bump for this issue
This issue has had no recent activity and will be closed soon.
Bump, even if I think (after 3 years) this won't be developed further...
What's the goal to have the bot that automatically closes stale issues if the project seems "stale" too? 😐
@virtualdj Flux is retiring.
@virtualdj Thanks, I missed that news 😢 So maybe it wasn't a big investment learning it...
This issue has had no recent activity and will be closed soon.
Currently, the only functions for detecting state changes are in the
influxdata/influxdb/monitor
package, but these functions have some pretty strict limitations:_level
column (a convention of the InfluxDB monitoring system)Proposal
Add a
stateChanges
orstateChangesOnly
function to the standard library that only returns records where the state differs from the previous row. The state column should be configurable.So the following:
Would return:
Questions
Do we include the first row of the input tables in the output, or not? Or do we make it configurable?
Can we make it so the
stateColumn
can be any data type? I think the most common will be string and bool, but it would be nice to make it flexible by supporting all datatypes.Other related issues