influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.9k stars 5.6k forks source link

SQL input plugin that only updates recent data #16013

Open wolfgangr opened 1 month ago

wolfgangr commented 1 month ago

Use Case

e.g. migrate an existing setup of time series collection to (my)SQL to influx

I have a number of Databases running for years, where measurements (Climate, Energy, Photovoltaics, Heating, ...) are written to MySQL tables. Some of them in a remote location, with MySQL binary log based replication as a reliable bridge even for temporary connection outage.

The Idea was to setup a telegraf input.SQL to monitor the new data and send them to influx.
Then all the visualisation, aggregation, downsampling etc might performed there.

Expected behavior

I hoped that input.SQL would manage some mechanism to discern between data already captured and data still to transfer.

Compare my old SQL aggregation:

SET @last_intvl := IFNULL( (SELECT MAX(idx_hr) FROM `hourly`), '0000-00-00');
REPLACE into `hourly`
   ....
SELECT FROM raw ....
WHERE idx >= @last_intvl  

Actual behavior

telegraf is always collecting all the rows of a configured query and sends them to influxDB.
In the end, they are discarded there, and the final result is ok.

However, just a simple 30 day period of weather data yields in some 200000 rows. At an update interval of 300 seconds, this puts considerable CPU and memory load on a 4-core machine. I'd expect it to knockle down at intervals as long as 30 seconds and/or source data extending over years.

I don't see any hook to query influx target bucket for the last update and limit the query to all data after that point.
I don't even see a hook to keep track of data sent through the telegraf plugin.

As a cruede temporary workaround, I added a "LIMIT 10" clause to the end of my SQL source query.
At least, this works well as long as all machines and connections are up and available.
It does not replay any backlog that has occured for whatever reasons.

Additional info

No response

srebhan commented 1 month ago

@wolfgangr as you can already see this adds a lot of complexity to the plugin! What is the "last interval"? In some cases it might be an index, in other cases a unix timestamp or a date. Is the timestamp in seconds, milliseconds, ...? Or what format should the date be in?

The only way I could see is to add a go-template and fill a few variables such as the timestamp of last query, timestamp of last metric etc. But is this really better than your initial query?

richardeaxon commented 1 month ago

Hi @wolfgangr, @srebhan

I wrote a generic SQL output processor that I have been meaning to contribute but I have not due to lacking documentation and test cases (its used in my production environment without issue). This output processor allows complete control of the SQL and is fully parameterized. You can pluck any field, tag, metric or timestamp as a parameter into a handwritten SQL statement. You can also choose if the metric chunk should be wrapped in a transaction or not (default is to use a transaction but this could lock large numbers of DB rows which may not be desirable).

I based this processor on the SQL input processor as the current SQL output processor is not really fit for purpose. Any go supported DB driver should work (I am using mysql against a MariaDB database). The options also resemble the input SQL processor which look similar to all the other SQL processor variants.

If you want, I can create a PR of the uncompleted work. I would like my processor to be include in Telegraf because as I said previously all the other SQL output processors are too specific and can/should be replaced with a generic one.

For your example in this ticket using my output processor, you would need to replace your multi-statement query with a CTE or sub-select.

An example:

[[outputs.sql2]]
  driver = "mysql"
  #transaction = true
  dsn = "username:password@tcp(dbserver:3306)/dbname"
  query = "INSERT INTO events (timestamp, source, resource, event, message) VALUES (FROM_UNIXTIME(?), LEFT(?,50), LEFT(?,128), LEFT(?,20), LEFT(?,20))"

  [[outputs.sql2.parameter_mappings]]
    timestamp = "timestamp"

  [[outputs.sql2.parameter_mappings]]
    field = "source"

  [[outputs.sql2.parameter_mappings]]
    field = "resource"

  [[outputs.sql2.parameter_mappings]]
    field = "event"

  [[outputs.sql2.parameter_mappings]]
    field = "message"
wolfgangr commented 1 month ago

@richardeaxon ,
I can't see where your approach is querying influxDB.
When "shitt happens" between the SQL (source DB) and influx (target DB), this is not stored in the SQL.

@srebhan , I agree with you that beyond a certain point of complexity, a touring complete language is the obvious way to go.
However, for a "go" moron like me, learning a new language is quite a bit complexity, too.

wolfgangr commented 1 month ago

What I have so far:

This flux query give me the last dataset that arrived at the influx bucket:

from(bucket: "wetter_raw")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "ecowitt")
  |> filter(fn: (r) => r["_field"] == "idx")
  |> last()
  |> keep(columns: ["_value"])

Quite lucky that I could not manage in telegraf to convert the SQL timetamp in 'idx' to epocs, so it's still available unchanged :-)

Instead of go, I use plain old shell to remove the decoration:

:~/influxDB/scripts$ influx query ... -file last_weather.flux | \
    tail -n1 | grep -oE '[0-9\-]{10}\ [0-9\:]{8}' 
2024-10-18 20:47:27

My plan is to run telegraf not as a demon, but as --once-on-shot from a cron job and convey the last update timestamp by an environment variable into the query stored in the telegraf config.

I know, this is neither elegant nor perfomant. I hope it's ok to run every couple of minutes. For subsecond updates, I'm afraid I had to learn go :wink:

------- edit: works ------------

shell script running both:

export INFLUX_URL=http://localhost:8086
export INFLUX_ORG=my.org
export LASTUPDATE=`influx query -org "my.org" -token "very-123-secret-456==" -file last_weather.flux | tail -n1 | grep -oE '[0-9\-]{10}\ [0-9\:]{8}'`
telegraf --config tg_weather_SQL_oneshot.conf --once # --test

The parametrized query in tg_weather_SQL_oneshot.conf now reads query = "SELECT *, UNIX_TIMESTAMP(CONVERT_TZ(idx, '+00:00', 'SYSTEM')) AS time FROM wetter.raw WHEREidx>= \"${LASTUPDATE}\" LIMIT 10"

LASTUPDATE is assignet to the result of the flux query, using bash backtick syntax.
In influx conf, this variable is inserted into the query.
The LIMIT 10 is a mere security measure for testing. It has to be removed at deployment, since it might introduce gaps into the synchronisation after outage times for more than 10 reords.

------------ edit No 2: -----------------------

just did some adjustments and tested whether the setup would correctly catch up after some simulated outage:

metric_batch_size = 1000
metric_buffer_limit = 10000

query = "SELECT *, UNIX_TIMESTAMP(CONVERT_TZ(idx, '+00:00', 'SYSTEM'))  AS time FROM wetter.raw WHERE `idx` >= \"${LASTUPDATE}\"  ORDER BY `idx` ASC  LIMIT 100"
wolfgangr commented 1 month ago

What I actually had in dark mind is that in telegraf, I could do sth like

But now I see that might end up in blowing telegraf config up to a language close to turing complete.

wolfgangr commented 1 month ago

It is in "works-for-me" now.
From my point, we may close it, if you don't see any obvious way to include it in telegram without major efforts.