bemasher / rtlamr

An rtl-sdr receiver for Itron ERT compatible smart meters operating in the 900MHz ISM band.
GNU Affero General Public License v3.0
2.19k stars 249 forks source link

Downsample data collection #162

Closed johndeyrup closed 3 years ago

johndeyrup commented 3 years ago

My electric meter sends out updates every couple of seconds is there a way to not collect data until x minutes have been elapsed since the last update? I don't want to collect at that granularity. I saw that there was a flag that is sample rate, but that seemed to change how often rtl_tcp checks?

bemasher commented 3 years ago

What protocol does your meter transmit? IDM, SCM, SCM+?

johndeyrup commented 3 years ago

I am reading from water, gas and electric. I am receiving SCM and r900. The SCM electric meter is transmitting every couple of seconds

bemasher commented 3 years ago

The -samplerate flag sets the bandwidth of your radio.

Down-sampling or rate-limiting received messages is outside the scope of rtlamr. I would recommend that you check out a related project rtlamr-collect.

The other option is piping output of rtlamr through a program you write that controls which messages make it to your database.

johndeyrup commented 3 years ago

I don't see anything in https://github.com/bemasher/rtlamr-collect/blob/master/main.go that would let you downsample. Thanks for doing this by the way it alerted me that there was a leak going on.

bemasher commented 3 years ago

rtlamr-collect doesn't do any down-sampling, but it's trivial to do that sort of thing in influxdb. If you don't want to go that route, here is a short program that parses json messages from rtlamr and drops any messages from the same meter that arrive sooner than minTimeBetween:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/tidwall/gjson"
)

const (
    minTimeBetween = time.Minute
)

func init() {
    log.SetFlags(log.Lshortfile | log.Lmicroseconds)
    log.SetOutput(os.Stderr)
}

func main() {
    // Track time of the last accepted message for each meter.
    meterTimes := map[string]time.Time{}

    // Read messages from stdin.
    stdinBuf := bufio.NewScanner(os.Stdin)

    // For each line.
    for stdinBuf.Scan() {
        // Parse the message.
        msg := gjson.ParseBytes(stdinBuf.Bytes())

        // Get the time and message id.
        t := msg.Get("Time").Time()
        id := msg.Get("Message.ID").String()

        // If the meter has been seen before.
        if last, exists := meterTimes[id]; exists {
            // And the message is sooner than minTimeBetween from the last message.
            if t.Sub(last) < minTimeBetween {
                // Ignore it.
                log.Println("ignoring message:", msg)
                continue
            }
        }

        // Update time of the last accepted message for this meter.
        meterTimes[id] = t

        // Print the message.
        fmt.Println(stdinBuf.Text())
    }
}
johndeyrup commented 3 years ago

Got downsampling to work using continuous queries. Here is a 2m downsample for any that are interested. Note you will need to query "5y_1d"."downsampled_consumption" to get results back, that tripped me up when I set it up.

CREATE CONTINUOUS QUERY q2_2m ON rtlamr BEGIN select max(consumption) as consumption, endpoint_id INTO "5y_1d"."downsampled_consumption" from rtlamr group by time(2m), endpoint_id END