mdzio / ccu-historian

Der CCU-Historian erfasst die Betriebsdaten des Hausautomations-Systems HomeMatic der Firma eQ-3.
http://www.ccu-historian.de
GNU General Public License v3.0
121 stars 14 forks source link

Alte Daten komprimiere und / bzw löschen #408

Closed wakr70 closed 5 months ago

wakr70 commented 8 months ago

Hallo Matthias, durch die Scripts ist viel möglich geworden, danke nochmals für diese Erweiterung und Öffnung.

Trotzdem würde ich es begrüssen wenn sich Historian auch für die Altdaten Behandlung einige Funktionen beinhalten würde.

Hier nur mal einige Ideen.

Wöchentlicher oder Täglicher Job das automatisch übernimmt die Datenlöschung und die Datenkomprimierung.

3 zusätzliche Felder in der Datenpunktkonfiguration.

Wenn man jetzt noch einen Monatlichen um 03:00 -compact einplannen könnte dann wäre keine Wartung mehr nötig.

Was meinst du dazu ?

LG wak

wakr70 commented 8 months ago

Hallo Mathias,

hab mich etwas mit dem Script gespielt und folgendes programmiert, kannt du mal bitte drüberschauen, bin absolute Groovy-Anfänger!

Hab auf einen Spielsystem mal das so in die Config genommen:

database.tasks.DelCompr.enable=true
database.tasks.DelCompr.cron="0 0 3 ? * SUN"
database.tasks.DelCompr.script={

// Datenreihen löschen und komprimieren, V1.0

// min, max, first, last or avg as value for compression
def config = [
              ["DutyCycle.VALUE"        ,360, 10, 60*60*1,   "max" ],  // stündlichen Max Wert nach 10 Tagen und nach 370 Tagen löschen
              ["*.CARRIER_SENSE_LEVEL"  , 360, 10, 60*60*1,   "max" ],    // stündlichen Max Wert nach 10 Tagen und nach 370 Tagen löschen
              ["*.DUTY_CYCLE_LEVEL"     , 360, 10, 60*60*1,   "max" ],    // stündlichen Max Wert nach 10 Tagen und nach 370 Tagen löschen
              ["*HUMIDITY"              ,1000, 30, 60*60*2,   "avg" ],    // 2 Stunden Durchschnitt nach 30 Tagen und nach 1000 Tagen löschen
              ["*TEMPERATURE"           ,1000, 30, 60*60*2,   "avg" ],    // 2 Stunden Durchschnitt nach 30 Tagen und nach 1000 Tagen löschen
              ["*4.VALVE_STATE"         ,1000, 30, 60*60*3,   "avg" ],    // 3 Stunden Durchschnitt nach 30 Tagen und nach 1000 Tagen löschen
              ["*.ENERGY_COUNTER"       ,1000, 30, 60*60*2,   "first" ],  // 2 Stunden bei Zähler immer den 1. Wert nehmen nach 30 Tagen und nach 1000 Tagen löschen
              ["*.POWER"                ,1000, 30, 60*60*2,   "max" ],    // 2 Stunden max nach 30 Tagen und nach 1000 Tagen löschen
              ["*.POWER_STATUS"         , 730, 30, 60*60*2,   "avg" ],    // 2 Stunden max nach 30 Tagen und nach 1000 Tagen löschen
              ["*.FREQUENCY_STATUS"     , 730, 10, 60*60*3,   "avg" ],    // 3 Stunden Durschnitt nach 10 Tagen und nach 720 Tagen löschen
              ["*.CURRENT_STATUS"       , 730, 10, 60*60*3,   "avg" ],    // 3 Stunden Durschnitt nach 10 Tagen und nach 720 Tagen löschen
              ["*.VOLTAGE_STATUS"       , 730, 10, 60*60*3,   "avg" ],    // 3 Stunden Durschnitt nach 10 Tagen und nach 720 Tagen löschen
              ["*.ENERGY_COUNTER_OVERFLOW" ,730, 10, 60*60*3, "avg" ],
]

// Testlauf durchführen? Bei einem Testlauf wird die Datenbank nicht verändert.
// (Ja: true, Nein: false)
def testRun = true

// *** Skript ***

def dateFormat = "yyyy-MM-dd HH:mm"   // standard Timestamp Format

// 24 Bits für Flag berechnen 
def int bit24 = Math.pow(2,23)   // 8388608;

// wildcards in regex umbauen 
config.each { dptxt -> 
  def txtResult = new StringBuffer()
  dptxt[0].each { ch ->
    switch (ch) {
    case '*':
        // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
        txtResult << /[^\/]*/
        break
    case '?':
        // Any character except the normalized file separator ('/')
        txtResult << /[^\/]/
        break
    case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
        txtResult << '\\' + ch
        break
    default: txtResult << ch
    }
  }
  dptxt[0] = txtResult
  // println dptxt[0]
}

def comprFactor=0
def dpCount=[]
def summeDel=0
def summeComp=0
def summetotal=0
def lastDPState
database.dataPoints.sort{ it.displayName }.each { dp ->

  // check dp.displayName wird in config gefunden
  def found=null
  def delBegin=null
  def delEnd=null
  def komprBegin=null
  def cntNew

  // Datenpunkt in Liste über REGEX suchen
  config.find { con ->
    if (dp.displayName==~con[0]) {
      // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
      found = con
      // beim ersten gefunden, merken und Schleife verlassen
      return true // break
    }
  }
  if (found) {

    // Datenlöschung     ******* 
    if (found[1] > 10) {
      cntNew=0
      // Löschzeitraum bestimmen
      delBegin=database.getFirstTimestamp(dp)
      delEnd=new Date()-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang

      if (testRun) {
        cntNew=database.getCount(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
      } else {
        cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
        if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
      }
      summeDel=summeDel+cntNew
    }

    // Datenkomprimierung   ****** 
    if (found[2] > 5) {
      // Komprimierungs Zeitraum bestimmen
      if (delEnd) {
         komprBegin=delEnd
      } else {
         komprBegin=database.getFirstTimestamp(dp)
      }
      def komprEnd=new Date()-found[2]   // von Konfigurationtalle Tage bis zum Komprimierung
      comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
      def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

      def cnt

      // Zeitreihe holen
      def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)

      // erste Zeitreihe ohne komprimierung finden
      def komprBeg = komprEnd
      ts.find { pv ->
        if (!(pv.state&bit24)) { 
          komprBeg = pv.timestamp    // neues Beginndatum ermittelt, keine Behandlung von bereits komprimierten Datensätzen
          return true // break
        }
      }
      //println "Erste Timestamp ohne KompFlag: $komprBeg"

      // Zeitreihe holen neu ohne bereits komprimierte
      ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
      cnt=ts.size

      // Statistik berechnen
      def duration=komprEnd.time - komprBeg.time 
      def min=Double.POSITIVE_INFINITY
      def max=Double.NEGATIVE_INFINITY
      def integr=0
      def intsum=0
      def intwert=0
      def anzahl=0
      def summe=0
      def lastTime=0
      def thisTime=0
      def firstValue=0
      def lastValue=0
      def avg=0
      def previous
      def comprPosible=false

      // neue komprimierte Zeitreihe erstellen    
      def timeSeries=new TimeSeries(dp)

      ts.each { pv ->
        // Datum runden für die Komprimierung, immer das gleiche
        thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )

        if (thisTime!=lastTime) {
          if (lastTime!=0) {
            duration=lastTime.time - thisTime.time
            // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
            // def avg=integr/duration
            avg = Math.round(summe / anzahl * 10) / 10
            if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

            // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

            switch(comprValue) {            
            case "min": 
              timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
              break;
            case "max": 
              timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
              break;
            case "first": 
              timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
              break;
            case "last": 
              timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
              break;
            case "avg": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            case "integral": 
              timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
              break;
            }
            if (anzahl>1) comprPosible=true
          }

          min=pv.value
          max=pv.value
          anzahl=0
          summe=0
          integr=0
          intsum=0
          intwert=pv.value 
          firstValue=pv.value
          lastDPState=pv.state
          lastTime=thisTime
        }

        if (pv.value<min) min=pv.value
        if (pv.value>max) max=pv.value
        if (previous!=null) {
          // Teilintegral berechnen: Messwert*Millisekunden
          integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
          intsum+=(pv.timestamp.time-previous.timestamp.time)
        }
        lastValue=pv.value
        anzahl=anzahl+1
        summe=summe+pv.value
        previous=pv
      }
      if (lastTime!=0) {
         duration=lastTime.time - thisTime.time
        // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
        // def avg=integr/duration
        avg = Math.round(summe / anzahl * 10) / 10
        if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

        // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

        switch(comprValue) {            
        case "min": 
          timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
          break;
        case "max": 
          timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
          break;
        case "first": 
          timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
          break;
        case "last": 
          timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
          break;
        case "avg": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        case "integral": 
          timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
          break;
        }
        if (anzahl>1) comprPosible=true
      }
      if (comprPosible) {
        println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactor  Wert: $comprValue"
        cntNew=0
        if (testRun) {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
          cntNew=timeSeries.size 
        } else {
          println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
          cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
        }
        summeComp=summeComp+(cnt-cntNew)
        summetotal=summetotal+cnt
      }
    }
  }
}

println "Summe Datenzeilen gelöscht:    $summeDel"
println "Summe Datenzeilen komprimiert: $summeComp von $summetotal"
}
mdzio commented 8 months ago

Ich nehme an, die aktuelle Version ist jetzt im Forum? Willst Du nicht ein Repo auf GitHub dafür erstellen?

Den Code werde ich auch noch einmal durchschauen.

wakr70 commented 8 months ago

ja der aktuelle code ist derzeit im Forum, aber alles noch Probier code ... um mal die Thematik besser zu verstehen und veilleicht noch andere Ideen zu bekommen.

Über verbesserungsvorschläge würde ich mich sehr freuen.

Du kannst es auch dann gerne in dein Wiki aufnehmen, da es ja rein Historian code ist und mit Highchart ansich ja nix zu tun hat.

wakr70 commented 8 months ago

man könnte die Komprimierung auch mehrstufig machen wäre gar nicht soviel umbau. Am schönsten wäre es natürlich wenn die Konfig in der Weboberfläche integriert würde und ein Standardjob das ganze machen würde .. :-)

wakr70 commented 8 months ago

dann wäre das hier nur ein paar Andenkstöße

mdzio commented 8 months ago

Das sieht schon funktionstüchtig aus. Ein paar Anmerkungen:

Anstatt and immer && verwenden.

Neuer Code mit weiteren Vereinfachunge:

  if (conf[1].class == String ) {
    if (conf[1].endsWith("Y") &&        // volle Jahre 
      conf[1][0..-2].isInteger() &&
      conf[1][0..-2].toInteger() > 0 ) {
      def dateTmp = new Date(year: (jetzt.getYear() - conf[1][0..-2].toInteger()) , month: 0, date: 1, hours: 0, minutes: 0, seconds: 0)

Der CCU-Historian kann seit einiger Zeit auch selber mit Zeitreihen rechnen. Ein 2-Stunden-Durchschnittswert kann so berechnet werden:

def dp=database.getDataPoint(132)
def begin=parseDate("1.3.2024")
def end=parseDate("2.3.2024")

// Durchschnittswert alle 2 Stunden per Cron-Ausdruck mit Hilfe von Expression's
def avg2h=dataPoint(dp).average(cron("0 0 */2 * * *"))

// In TimeSeries umwandeln
def ts=new TimeSeries(dp)
avg2h.read(begin, end).each { entry -> ts.add entry }

println "Berechnet:"
ts.each { entry -> println entry }

//database.replaceTimeSeries(dp, ts, begin, end)
wakr70 commented 8 months ago

danke, hab das mit && und Vereinfachung mal eingebau. Das mit den Zeitreihen rechnen paßt mir aber jetzt so nicht in Script rein, da müßte ich alles umbauen und ich glaub da gewinnen wir jetzt auch nicht wirklich viel Zeit.

hier die aktuelle Version, läuft die auch deinen System ?


 // Datenreihen löschen und komprimieren, V1.0

  // Mit der Variable testRun=false kann ihn den Schreib-Modus gewechselt werden und 
  // damit werden Daten gelöscht und / oder //komprimiert.
  // (Ja: true, Nein: false)
  def testRun = true

  // Bei einer Anpassung der config kann es erforderlich sein existierende komprimierte 
  // Datensätze für die nächste Komprimierung zu wiederholen, 
  // dazu muß Varialbe rekompress = true gesetz werden. Nach erfolgreichen Lauf sollte das wieder 
  // zurückgesetzt werden auf rekompress = false, damit das Script viel schneller läuft!
  // (Ja: true, Nein: false)
  def rekompress = false

  /*
  Mit der Variable config kann man die gewünschten Datenpunkte konfigurieren:
  Parameter 1 z.b. "*DutyCycle*" ist der Displayname und es können * (Wildcards verwendet werden)
  Parameter 2 z.b. 365 ist die Anzahl der Tage die in der Datenbank verbleiben sollen, 365 wäre hier ein Jahr,
                       alle Daten drüber ob komprimiert oder nicht werden gelöscht (es gibt auch volle Monate 
                       oder volle Jahre "5M" oder "3Y" )
  Parameter 3 z.b. 10 ist hier die Anzal der Tage nachdem die Werte komprimiert werden
  Parameter 4 z.b. 60*60*1 bestimmt den Komprimierungszeitraum das wäre 1 Std. (gängige Werte für mich 60*60*2 oder 60*60*24)
  Parameter 5 z.b. "max" bestimmt die Art der Komprimierung "max" = maximaler Wert in 1 Std. ebenso gibt es noch:
                   "min" - Minimuwert in 1Std.
                   "avg" - Durchschnittswert verwende ich bei Temperaturwerten oder Feuchtigkeitswerten
                   "first" - erster Wert im Komprimierungszeitraum verwende ich bei Zählern
                   "last" - letzter Wert im Komprimierungszeitraum
                   "integral" - Durchschittswert mit Zeitfaktor, funktioniert aber nur bei vielen Wert 
                                in einem Komprimierungszeitraum spricht Tagesrdurchschnitt

  */

  // min, max, first, last or avg as value for compression
  def config = [
                ["*DUTYCYCLE*"          , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen
                ["*CARRIER_SENSE*"      , 365, 10, 60*60*1,   "max" ],    // Std. Max-Wert nach 10 Tagen und nach 365 Tagen löschen 
                ["*HUMIDITY"            ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
                ["*TEMPERATURE"         ,"3Y", 30, 60*60*2,   "avg" ],    // 2 Std. Durchschnitt nach 30 Tagen und nach 3 vollen Jahren löschen
                ["*ENERGY_COUNTER"      ,"5Y", 30, 60*60*2,   "first" ],  // 2 Std. bei Zähler 1. Wert nach 30 Tagen und nach 5 vollen Jahren 
]

  // *** Skript *************************

  def dateFormat = "yyyy-MM-dd"   // standard Timestamp Format
  def jetzt =  new Date()               // aktuellen StartTimeStamp speichern für Berechnungen

  // 24 Bits für Flag berechnen 
  def int bit24 = Math.pow(2,23)   // 8388608;

  config.each { conf -> 

    // Löschdatum 5Y oder 3M auf Tage aktuell umrechnen mit 1. des Monats
    def orgConf = conf[1]
    if (conf[1].getClass() == String ) {
      if (conf[1].endsWith("Y") && conf[1].length()>=2) {
        if ( conf[1][0..-2].isInteger() && conf[1][0..-2].toInteger() > 0 ) {
          def yearValue = conf[1][0..-2].toInteger()
          def dateTmp = new Date(year: (jetzt.getYear() - yearValue ) , month: 0, date: 1, hours: 0, minutes: 0, seconds: 0)
          conf[1] = jetzt - dateTmp     // berechnete Löschtage setzen für xY
        } else {
          conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
        }
      } else if (conf[1].endsWith("M") && conf[1].length()>=2) {
        if ( conf[1][0..-2].isInteger() && conf[1][0..-2].toInteger() > 0 ) {
          def monthValue = conf[1][0..-2].toInteger()
          def cal=Calendar.getInstance().clearTime()
          cal.add(Calendar.MONTH, monthValue * -1);
          cal.set(Calendar.DAY_OF_MONTH, 1);
          conf[1] = jetzt - cal.getTime() // berechnete Löschtage setzen für xM
        } else {
          conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
        }
      } else {
        conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
      }
      def delDate = jetzt - conf[1]
      delDate.clearTime()
      println conf[0].padRight(40) + " Löschdatum berechnet " + delDate.format(dateFormat) + "  " + orgConf + "  " + conf[1]
    }

    // wildcards in regex umbauen 
    def txtResult = new StringBuffer()
    conf[0].each { ch ->
      switch (ch) {
      case '*':
          // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
          txtResult << /[^\/]*/
          break
      case '?':
          // Any character except the normalized file separator ('/')
          txtResult << /[^\/]/
          break
      case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
          txtResult << '\\' + ch
          break
      default: txtResult << ch
      }
    }
    conf[0] = txtResult.toString().toUpperCase()
    // println conf[0]
  }

  def comprFactor=0
  def dpCount=[]
  def summeDel=0
  def summeComp=0
  def summetotal=0
  def lastDPState
  database.dataPoints.sort{ it.displayName.toUpperCase() }.each { dp ->

    // check dp.displayName wird in config gefunden
    def found=null
    def delBegin=null
    def delEnd=null
    def komprBegin=null
    def cntNew

    // Datenpunkt in Liste über REGEX suchen
    config.find { con ->
      if (dp.displayName.toUpperCase()==~con[0]) {
        // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
        found = con
        // beim ersten gefunden, merken und Schleife verlassen
        return true // break
      }
    }
    if (found) {

      // Datenlöschung     ******* 
      if (found[1] > 10) {
        cntNew=0
        // Löschzeitraum bestimmen
        delBegin=database.getFirstTimestamp(dp)
        delEnd=jetzt-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang
        delEnd.clearTime()         // Zeit auf 00:00:00 stellen

        if (testRun) {
          cntNew=database.getCount(dp, delBegin  , delEnd)
          if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
        } else {
          cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
          if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
        }
        summeDel=summeDel+cntNew
      }

      // Datenkomprimierung   ****** 
      if (found[2] > 5) {
        // Komprimierungs Zeitraum bestimmen
        if (delEnd) {
           komprBegin=delEnd
        } else {
           komprBegin=database.getFirstTimestamp(dp)
        }
        def komprEnd=jetzt-found[2]
        komprEnd.clearTime()    // Zeit auf 00:00:00 stellen

        comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
        def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

        def cnt

        // Zeitreihe holen
        def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)

        def komprBeg = komprBegin
        if (!rekompress) {

          // erste Zeitreihe ohne komprimierung finden
          komprBeg = komprEnd
          ts.find { pv ->
            if (!(pv.state&bit24)) { 
              komprBeg = pv.timestamp    // neues begin ermittelt
              return true // break
            }
          }
        }
        //println "Erste Timestamp ohne KompFlag: $komprBeg"

        // Zeitreihe holen neu ohne bereits komprimierte
        ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
        cnt=ts.size

        // Statistik berechnen
        def duration=komprEnd.time - komprBeg.time 
        def min=Double.POSITIVE_INFINITY
        def max=Double.NEGATIVE_INFINITY
        def integr=0
        def intsum=0
        def intwert=0
        def anzahl=0
        def summe=0
        def lastTime=0
        def thisTime=0
        def firstValue=0
        def lastValue=0
        def avg=0
        def previous
        def comprPosible=false

        // neue komprimierte Zeitreihe erstellen    
        def timeSeries=new TimeSeries(dp)

        ts.each { pv ->
          thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )

          if (thisTime!=lastTime) {
            if (lastTime!=0) {
              duration=lastTime.time - thisTime.time
              // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
              // def avg=integr/duration
              avg = Math.round(summe / anzahl * 10) / 10
              if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

              // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

              switch(comprValue) {            
              case "min": 
                timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
                break;
              case "max": 
                timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
                break;
              case "first": 
                timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
                break;
              case "last": 
                timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
                break;
              case "avg": 
                timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
                break;
              case "integral": 
                timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
                break;
              }
              if (anzahl>1) comprPosible=true
            }

            min=pv.value
            max=pv.value
            anzahl=0
            summe=0
            integr=0
            intsum=0
            intwert=pv.value 
            firstValue=pv.value
            lastDPState=pv.state
            lastTime=thisTime
          }

          if (pv.value<min) min=pv.value
          if (pv.value>max) max=pv.value
          if (previous!=null) {
            // Teilintegral berechnen: Messwert*Millisekunden
            integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
            intsum+=(pv.timestamp.time-previous.timestamp.time)
          }
          lastValue=pv.value
          anzahl=anzahl+1
          summe=summe+pv.value
          previous=pv
        }
        if (lastTime!=0) {
           duration=lastTime.time - thisTime.time
          // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
          // def avg=integr/duration
          avg = Math.round(summe / anzahl * 10) / 10
          if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

          // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

          switch(comprValue) {            
          case "min": 
            timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
            break;
          case "max": 
            timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
            break;
          case "first": 
            timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
            break;
          case "last": 
            timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
            break;
          case "avg": 
            timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
            break;
          case "integral": 
            timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
            break;
          }
          if (anzahl>1) comprPosible=true
        }
        if (comprPosible) {
          def comprFactorSek = comprFactor/1000  
          println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactorSek Wert: $comprValue"
          cntNew=0
          if (testRun) {
            println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
            cntNew=timeSeries.size 
          } else {
            println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
            cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
          }
          summeComp=summeComp+ (cnt-cntNew)
          summetotal=summetotal+cnt
        }
      }
    }
  }

  println "Summe Datenzeilen gelöscht:    $summeDel"
  def delSum = summetotal - summeComp 
  println "Summe Datenzeilen komprimiert: $summeComp von $summetotal  -> $delSum"

  use(groovy.time.TimeCategory) {
      def duration = new Date() - jetzt
      print "Laufzeit: Std.: ${duration.hours}, Min.: ${duration.minutes}, Sek.: ${duration.seconds}"
  }
mdzio commented 7 months ago

Das Skript ist auch auf meinem System ohne Fehler durchgelaufen. Das Ergebnis ist soweit plausibel.

wakr70 commented 7 months ago

Derzeit ist die Komprimierung ja nur 1 stufig bevor Löschung, meinst würde eine Mehrstufigkeit Sinn machen ?

z.B. volle Daten 1 Monat, Stunden Durchschnitt 1 Jahr, darüber Tages Durchschnitt und nach 5 Jahren löschen ...

bin derzeit noch unentschlossen ... :-)

mdzio commented 7 months ago

Eine mehrstufige Komprimierung bringt meines Erachtens nicht mehr sooo viel. Wenn die erste Stufe schon um Faktor 20 verdichtet, dann sind die verbleibenden Datenmengen bereits sehr klein.

mdzio commented 5 months ago

Wo finde ich denn die aktuelle Version des Skriptes? Dann werde ich es mal mit zu den Skripten im Wiki packen.

wakr70 commented 5 months ago

Hallo Mathias,

derzeit habe ich folgenden Eintrag in meiner Config:

database.tasks.DelCompr.enable=true
database.tasks.DelCompr.cron="0 0 3 ? * SUN"
database.tasks.DelCompr.script={

  // Datenreihen löschen und komprimieren, V1.0

  // Mit der Variable testRun=false kann ihn den Schreib-Modus gewechselt werden und 
  // damit werden Daten gelöscht und / oder //komprimiert.
  // (Ja: true, Nein: false)
  def testRun = false

  // Bei einer Anpassung der config kann es erforderlich sein existierende komprimierte 
  // Datensätze für die nächste Komprimierung zu wiederholen, 
  // dazu muß Varialbe rekompress = true gesetz werden. Nach erfolgreichen Lauf sollte das wieder 
  // zurückgesetzt werden auf rekompress = false, damit das Script viel schneller läuft!
  // (Ja: true, Nein: false)
  def rekompress = false

  /*
  Mit der Variable config kann man die gewünschten Datenpunkte konfigurieren:
  Parameter 1 z.b. "*DutyCycle*" ist der Displayname und es können * (Wildcards verwendet werden)
  Parameter 2 z.b. 365 ist die Anzahl der Tage die in der Datenbank verbleiben sollen, 365 wäre hier ein Jahr,
                       alle Daten drüber ob komprimiert oder nicht werden gelöscht (es gibt auch volle Monate 
                       oder volle Jahre "5M" oder "3Y" )
  Parameter 3 z.b. 10 ist hier die Anzal der Tage nachdem die Werte komprimiert werden
  Parameter 4 z.b. 60*60*1 bestimmt den Komprimierungszeitraum das wäre 1 Std. (gängige Werte für mich 60*60*2 oder 60*60*24)
  Parameter 5 z.b. "max" bestimmt die Art der Komprimierung "max" = maximaler Wert in 1 Std. ebenso gibt es noch:
                   "min" - Minimuwert in 1Std.
                   "avg" - Durchschnittswert verwende ich bei Temperaturwerten oder Feuchtigkeitswerten
                   "first" - erster Wert im Komprimierungszeitraum verwende ich bei Zählern
                   "last" - letzter Wert im Komprimierungszeitraum
                   "integral" - Durchschittswert mit Zeitfaktor, funktioniert aber nur bei vielen Wert 
                                in einem Komprimierungszeitraum spricht Tagesrdurchschnitt

  */

  // min, max, first, last or avg as value for compression
  def config = [
                ["*DUTYCYCLE*"            ,"3Y", 10, 60*60*4,   "max" ],    // 4 Std Max-Wert nach 10 Tagen und nach 3 Jahren löschen
                ["*CARRIER_SENSE*"        ,"3Y", 10, 60*60*4,   "max" ],    // 4 Std Max-Wert nach 10 Tagen und nach 3 Jahren löschen 
                ["*HUMIDITY"              ,1000, 30, 60*60*2,   "avg" ],    // 2 Std Durchsch. nach 30 Tagen und nach 1000 Tagen löschen
                ["*TEMPERATURE"           ,1000, 30, 60*60*2,   "avg" ],    // 2 Std Durchsch. nach 30 Tagen und nach 1000 Tagen löschen
                ["*4.VALVE_STATE"         ,1000, 30, 60*60*3,   "avg" ],    // 3 Std Durchsch. nach 30 Tagen und nach 1000 Tagen löschen
                ["*.ENERGY_COUNTER"       ,1000, 30, 60*60*2,   "first" ],  // 2 Stunden bei Zähler immer den 1. Wert nehmen nach 30 Tagen und nach 1000 Tagen löschen
                ["*.POWER"                ,1000, 30, 60*60*2,   "max" ],    // 2 Std max nach 30 Tagen und nach 1000 Tagen löschen
                ["*.POWER_STATUS"         , 730, 30, 60*60*2,   "avg" ],    // 2 Std Durchsch. nach 30 Tagen und nach 730 Tagen löschen
                ["*.FREQUENCY_STATUS"     , 730, 10, 60*60*3,   "avg" ],  // 3 Std Durchsch. nach 10 Tagen und nach 730 Tagen löschen
                ["*.CURRENT_STATUS"       , 730, 10, 60*60*3,   "avg" ],    // 3 Std Durchsch. nach 10 Tagen und nach 730 Tagen löschen
                ["*.VOLTAGE_STATUS"       , 730, 10, 60*60*3,   "avg" ],    // 3 Std Durchsch. nach 10 Tagen und nach 730 Tagen löschen
                ["*.ENERGY_COUNTER_OVERFLOW" ,730, 10, 60*60*3, "avg" ],   // 3 Std Durchsch. nach 10 Tagen und nach 730 Tagen löschen
  ]

  // *** Skript *************************

  def dateFormat = "yyyy-MM-dd"   // standard Timestamp Format
  def jetzt =  new Date()               // aktuellen StartTimeStamp speichern für Berechnungen

  // 24 Bits für Flag berechnen 
  def int bit24 = Math.pow(2,23)   // 8388608;

  config.each { conf -> 

    // Löschdatum 5Y oder 3M auf Tage aktuell umrechnen mit 1. des Monats
    def orgConf = conf[1]
    if (conf[1].getClass() == String ) {
      if (conf[1].endsWith("Y") && conf[1].length()>=2) {
        if ( conf[1][0..-2].isInteger() && conf[1][0..-2].toInteger() > 0 ) {
          def yearValue = conf[1][0..-2].toInteger()
          def dateTmp = new Date(year: (jetzt.getYear() - yearValue ) , month: 0, date: 1, hours: 0, minutes: 0, seconds: 0)
          conf[1] = jetzt - dateTmp     // berechnete Löschtage setzen für xY
        } else {
          conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
        }
      } else if (conf[1].endsWith("M") && conf[1].length()>=2) {
        if ( conf[1][0..-2].isInteger() && conf[1][0..-2].toInteger() > 0 ) {
          def monthValue = conf[1][0..-2].toInteger()
          def cal=Calendar.getInstance().clearTime()
          cal.add(Calendar.MONTH, monthValue * -1);
          cal.set(Calendar.DAY_OF_MONTH, 1);
          conf[1] = jetzt - cal.getTime() // berechnete Löschtage setzen für xM
        } else {
          conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
        }
      } else {
        conf[1] = 0   // bei falschen Werten auf 0 setzen und nix tun
      }
      def delDate = jetzt - conf[1]
      delDate.clearTime()
      println conf[0].padRight(40) + " Löschdatum berechnet " + delDate.format(dateFormat) + "  " + orgConf + "  " + conf[1]
    }

    // wildcards in regex umbauen 
    def txtResult = new StringBuffer()
    conf[0].each { ch ->
      switch (ch) {
      case '*':
          // Single '*' matches single dir/file; Double '*' matches sequence of zero or more dirs/files
          txtResult << /[^\/]*/
          break
      case '?':
          // Any character except the normalized file separator ('/')
          txtResult << /[^\/]/
          break
      case ['$', '|', '[', ']', '(', ')', '.', ':', '{', '}', '\\', '^', '+']:
          txtResult << '\\' + ch
          break
      default: txtResult << ch
      }
    }
    conf[0] = txtResult.toString().toUpperCase()
    // println conf[0]
  }

  def comprFactor=0
  def dpCount=[]
  def summeDel=0
  def summeComp=0
  def summetotal=0
  def lastDPState
  database.dataPoints.sort{ it.displayName.toUpperCase() }.each { dp ->

    // check dp.displayName wird in config gefunden
    def found=null
    def delBegin=null
    def delEnd=null
    def komprBegin=null
    def cntNew

    // Datenpunkt in Liste über REGEX suchen
    config.find { con ->
      if (dp.displayName.toUpperCase()==~con[0]) {
        // println  con[0] + " -> " + dp.displayName    // check REGEX Regeln
        found = con
        // beim ersten gefunden, merken und Schleife verlassen
        return true // break
      }
    }
    if (found) {

      // Datenlöschung     ******* 
      if (found[1] > 10) {
        cntNew=0
        // Löschzeitraum bestimmen
        delBegin=database.getFirstTimestamp(dp)
        delEnd=jetzt-found[1]      // found[1] = 2 spalte aus der Konfiguration Tabelle am Anfang
        delEnd.clearTime()         // Zeit auf 00:00:00 stellen

        if (testRun) {
          cntNew=database.getCount(dp, delBegin  , delEnd)
          if (cntNew>0) println "$dp.displayName: $cntNew werden gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat) + " (Testlauf)"
        } else {
          cntNew=database.deleteTimeSeries(dp, delBegin  , delEnd)
          if (cntNew>0) println "$dp.displayName: $cntNew gelöscht! von: " + delBegin.format(dateFormat) + "  bis: " + delEnd.format(dateFormat)
        }
        summeDel=summeDel+cntNew
      }

      // Datenkomprimierung   ****** 
      if (found[2] > 5) {
        // Komprimierungs Zeitraum bestimmen
        if (delEnd) {
           komprBegin=delEnd
        } else {
           komprBegin=database.getFirstTimestamp(dp)
        }
        def komprEnd=jetzt-found[2]
        komprEnd.clearTime()    // Zeit auf 00:00:00 stellen

        comprFactor = 1000*found[3]  // Sekunden von Konfigurationtabelle * 1000 auf Millisekungen
        def comprValue = found[4]    // von Konfigurationtalle "AVG", "MIN", "MAX", ...  

        def cnt

        // Zeitreihe holen
        def ts=database.getTimeSeriesRaw(dp, komprBegin, komprEnd)

        def komprBeg = komprBegin
        if (!rekompress) {

          // erste Zeitreihe ohne komprimierung finden
          komprBeg = komprEnd
          ts.find { pv ->
            if (!(pv.state&bit24)) { 
              komprBeg = pv.timestamp    // neues begin ermittelt
              return true // break
            }
          }
        }
        //println "Erste Timestamp ohne KompFlag: $komprBeg"

        // Zeitreihe holen neu ohne bereits komprimierte
        ts=database.getTimeSeriesRaw(dp, komprBeg , komprEnd)
        cnt=ts.size

        // Statistik berechnen
        def duration=komprEnd.time - komprBeg.time 
        def min=Double.POSITIVE_INFINITY
        def max=Double.NEGATIVE_INFINITY
        def integr=0
        def intsum=0
        def intwert=0
        def anzahl=0
        def summe=0
        def lastTime=0
        def thisTime=0
        def firstValue=0
        def lastValue=0
        def avg=0
        def previous
        def comprPosible=false

        // neue komprimierte Zeitreihe erstellen    
        def timeSeries=new TimeSeries(dp)

        ts.each { pv ->
          thisTime=new Date( ( ( Math.floor(pv.timestamp.time/comprFactor)*comprFactor) as long) )

          if (thisTime!=lastTime) {
            if (lastTime!=0) {
              duration=lastTime.time - thisTime.time
              // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
              // def avg=integr/duration
              avg = Math.round(summe / anzahl * 10) / 10
              if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

              // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

              switch(comprValue) {            
              case "min": 
                timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
                break;
              case "max": 
                timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
                break;
              case "first": 
                timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
                break;
              case "last": 
                timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
                break;
              case "avg": 
                timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
                break;
              case "integral": 
                timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
                break;
              }
              if (anzahl>1) comprPosible=true
            }

            min=pv.value
            max=pv.value
            anzahl=0
            summe=0
            integr=0
            intsum=0
            intwert=pv.value 
            firstValue=pv.value
            lastDPState=pv.state
            lastTime=thisTime
          }

          if (pv.value<min) min=pv.value
          if (pv.value>max) max=pv.value
          if (previous!=null) {
            // Teilintegral berechnen: Messwert*Millisekunden
            integr+=previous.value*(pv.timestamp.time-previous.timestamp.time)
            intsum+=(pv.timestamp.time-previous.timestamp.time)
          }
          lastValue=pv.value
          anzahl=anzahl+1
          summe=summe+pv.value
          previous=pv
        }
        if (lastTime!=0) {
           duration=lastTime.time - thisTime.time
          // Durchnitt ist Integral/Zeitbereichslänge in Millisekunden.
          // def avg=integr/duration
          avg = Math.round(summe / anzahl * 10) / 10
          if (intsum!=0) intwert = Math.round(integr / intsum * 10) / 10

          // println lastTime.format(dateFormat) + " Anzahl: $anzahl, Minimum: $min, Maximum: $max, Integral: $intwert, Durchschnitt: $avg, Duration: $duration, First: $firstValue, Last: $lastValue"

          switch(comprValue) {            
          case "min": 
            timeSeries.add(new ProcessValue(lastTime, min, bit24|lastDPState ))
            break;
          case "max": 
            timeSeries.add(new ProcessValue(lastTime, max, bit24|lastDPState ))
            break;
          case "first": 
            timeSeries.add(new ProcessValue(lastTime, firstValue, bit24|lastDPState ))
            break;
          case "last": 
            timeSeries.add(new ProcessValue(lastTime, lastValue, bit24|lastDPState ))
            break;
          case "avg": 
            timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
            break;
          case "integral": 
            timeSeries.add(new ProcessValue(lastTime, avg, bit24|lastDPState ))
            break;
          }
          if (anzahl>1) comprPosible=true
        }
        if (comprPosible) {
          def comprFactorSek = comprFactor/1000  
          println "$dp.displayName: Konfig " + komprBegin.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " Zeitraum Sek: $comprFactorSek Wert: $comprValue"
          cntNew=0
          if (testRun) {
            println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat) + " (Testlauf)"
            cntNew=timeSeries.size 
          } else {
            println "$dp.displayName: komprimiert! $cnt -> $timeSeries.size von: " + komprBeg.format(dateFormat) + " bis: " + komprEnd.format(dateFormat)
            cntNew=database.replaceTimeSeries(dp, timeSeries, komprBeg, komprEnd)
          }
          summeComp=summeComp+ (cnt-cntNew)
          summetotal=summetotal+cnt
        }
      }
    }
  }

  println "Summe Datenzeilen gelöscht:    $summeDel"
  def delSum = summetotal - summeComp 
  println "Summe Datenzeilen komprimiert: $summeComp von $summetotal  -> $delSum"

  use(groovy.time.TimeCategory) {
      def duration = new Date() - jetzt
      print "Laufzeit: Std.: ${duration.hours}, Min.: ${duration.minutes}, Sek.: ${duration.seconds}"
  }

}

Lg wak

mdzio commented 5 months ago

Das Skript ist nun im Wiki zu finden. LG Mathias