influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
769 stars 153 forks source link

Question: Best practise for transforming timeseries data into logs with time-ranges #5424

Closed paulwer closed 7 months ago

paulwer commented 1 year ago

Hey everyone, today we encountered an use-case, where machine-status-data saves as timeseries in influx should be transformed into log-like data with a start and stop timestamp.

f.ex.

array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: true},
  {_time: 2021-01-01T00:01:00Z, _value: true},
  {_time: 2021-01-01T00:02:00Z, _value: false},
  {_time: 2021-01-01T00:03:00Z, _value: false},
  {_time: 2021-01-01T00:04:00Z, _value: false},
  {_time: 2021-01-01T00:05:00Z, _value: true},
  {_time: 2021-01-01T00:06:00Z, _value: true},
])

should result in:

array.from(rows: [
  {_start: 2021-01-01T00:00:00Z, _stop: 2021-01-01T00:01:00Z, _value: true},
  {_time: 2021-01-01T00:02:00Z, _stop: 2021-01-01T00:04:00Z, _value: false},
  {_time: 2021-01-01T00:05:00Z, _stop: 2021-01-01T00:06:00Z, _value: true},
])

my first suggestion was to use a function like monitor.changeState. https://github.com/influxdata/flux/issues/3582 suggested it is not implemented. As a workaround / idea to start, we had tried using the following statement, which is no quite performat, when quering larger datasets. We are using the differnce to determine, if the boolean value has been changed and in which direction. We are using aggregate Sum to keep the latest value, when its been "active" within the whole range. (an no value_changes has occur) Our api then aggregate this data to the wanted format. { _measurement: string; _field: string; start: Date; stop: Date; _value: boolean; } Aggregation is not an option, because we need the precise time-stamps.

from(bucket: "bucket-name")
    |> range(start: -5h, stop: now())
    // additional filtering here...
    |> map(fn: (r) => ({r with rowId: 1, change: int(v: r._value) }))
    |> difference(initialZero: true, keepFirst: true, columns: ["change"])
    |> sort(columns: ["_time"], desc: true)
    |> cumulativeSum(columns: ["rowId"])
    |> sort(columns: ["_time"], desc: false)
    |> filter(fn: (r) => (r._value == true and r.rowId == 1) or r.change != 0)

@UlrichThiess

paulwer commented 1 year ago

we also thought about running this query regulary with a smaller range and store the result into an other database. is this apropiate? or did someone would suggest a better solution?

@UlrichThiess i tagged you because of a previous conversation :)

github-actions[bot] commented 1 year ago

This issue has had no recent activity and will be closed soon.

paulwer commented 1 year ago

any updates or suggestions?

github-actions[bot] commented 1 year ago

This issue has had no recent activity and will be closed soon.

paulwer commented 1 year ago

problem noch nicht gelöst.

github-actions[bot] commented 10 months ago

This issue has had no recent activity and will be closed soon.

paulwer commented 10 months ago

problem noch nicht gelöst.

...

UlrichThiess commented 9 months ago
import "array"

data = array.from(rows: [
  {_time: 2021-01-01T00:00:00Z, _value: true},
  {_time: 2021-01-01T00:01:00Z, _value: true},
  {_time: 2021-01-01T00:02:00Z, _value: false},
  {_time: 2021-01-01T00:03:00Z, _value: false},
  {_time: 2021-01-01T00:04:00Z, _value: false},
  {_time: 2021-01-01T00:05:00Z, _value: true},
  {_time: 2021-01-01T00:06:00Z, _value: true},
])

// Markieren Sie den Start eines neuen Segments
marked = data
  |> map(fn: (r, index) => ({
      r with
      segmentStart: if index == 0 or r._value != array.get(arr: data, i: index - 1)._value then r._time else 0
    })
  )

// Zusammenfassen der Segmente
result = marked
  |> reduce(
      identity: {start: 0, stop: 0, value: false, init: true},
      fn: (r, accumulator) => ({
        start: if accumulator.init then r.segmentStart else accumulator.start,
        stop: r._time,
        value: r._value,
        init: if r.segmentStart != 0 then false else accumulator.init
      })
    )
  |> filter(fn: (r) => r.start != 0)

result
github-actions[bot] commented 7 months ago

This issue has had no recent activity and will be closed soon.