emoncms / postprocess

Post Processing module for emoncms
GNU Affero General Public License v3.0
3 stars 12 forks source link

introducing python processes ? #21

Open alexandrecuer opened 3 years ago

alexandrecuer commented 3 years ago

@TrystanLea : would you be happy to also introduce python processes in postprocess ? right now, only php processes are supported, that's enough for many things, but with python processes, we could take advantages of the numpy library, which is very powerfull :-)

I think it could be quite simple to support python processes that but I wanted your opinion...

I recently investigate how to subclass the numpy ndarray object cf https://numpy.org/devdocs/user/basics.subclassing.html

Here is a first proof of concept of a basic pyFina object deriving from the numpy ndarray object. No averaging in that class, but we could introduce some quite easily IMO

import numpy as np
import struct
import os
import math

class PyFina(np.ndarray):

    def __new__(cls, id, dir, start, step, npts):
        """
        decoding the .meta file

        id (4 bytes, Unsigned integer)
        npoints (4 bytes, Unsigned integer, Legacy : use instead filesize//4 )
        interval (4 bytes, Unsigned integer)
        start_time (4 bytes, Unsigned integer)

        """
        with open("{}/{}.meta".format(dir,id),"rb") as f:
            f.seek(8,0)
            hexa = f.read(8)
            aa= bytearray(hexa)
            if len(aa)==8:
                decoded=struct.unpack('<2I', aa)
            else:
                print("corrupted meta - aborting")
                return
        meta = {
                 "interval":decoded[0],
                 "start_time":decoded[1],
                 "npoints":os.path.getsize("{}/{}.dat".format(dir,id))//4
               }
        """
        decoding and sampling the .dat file
        values are 32 bit floats, stored on 4 bytes
        to estimate value(time), position in the dat file is calculated as follow :
        pos = (time - meta["start_time"]) // meta["interval"]
        Nota : no NAN value - if a NAN is detected, the algorithm will fetch the first non NAN value in the future
        """
        verbose = False
        obj = np.zeros(npts).view(cls)

        end = start + (npts-1) * step
        time = start
        i = 0
        with open("{}/{}.dat".format(dir,id), "rb") as ts:
            while time < end:
                time = start + step * i
                pos = (time - meta["start_time"]) // meta["interval"]
                if pos >=0 and pos < meta["npoints"]:
                    #print("trying to find point {} going to index {}".format(i,pos))
                    ts.seek(pos*4, 0)
                    hexa = ts.read(4)
                    aa= bytearray(hexa)
                    if len(aa)==4:
                      value=struct.unpack('<f', aa)[0]
                      if not math.isnan(value):
                          obj[i] = value
                      else:
                          if verbose:
                              print("NAN at pos {} uts {}".format(pos, meta["start_time"]+pos*meta["interval"]))
                          j=1
                          while True:
                              #print(j)
                              ramble=(pos+j)*4
                              ts.seek(ramble, 0)
                              hexa = ts.read(4)
                              aa= bytearray(hexa)
                              value=struct.unpack('<f', aa)[0]
                              if math.isnan(value):
                                  j+=1
                              else:
                                  break
                          obj[i] = value
                    else:
                      print("unpacking problem {} len is {} position is {}".format(i,len(aa),position))
                i += 1
        """
        storing the "signature" of the "sampled" feed
        """
        obj.start = start
        obj.step = step

        return obj

    def __array_finalize__(self, obj):
        if obj is None: return
        self.start = getattr(obj, 'start', None)
        self.step = getattr(obj, 'step', None)

    def timescale(self):
        """
        return the time scale of the feed as a numpy array
        """
        return np.arange(0,self.step*self.shape[0],self.step)

PyFina object creation in python is very easy. Here is an example with a feed number 66 starting on timestamp 1577404800, fetching the original recording and sampling a point each half an hour, for a duration of 200 hours :

start=1577404800
id = 66
dir = "/home/alexandrecuer/BIOS/labo/phpfina"
feed = PyFina(id,dir,start,10,72000)
feedS1 = PyFina(id,dir,start,1800,400)

import matplotlib.pylab as plt
plt.subplot(111)
plt.ylabel('instant power W')
plt.xlabel('time in seconds')
plt.plot(feed.timescale(),feed,label="original recording",color="orange")
plt.plot(feedS1.timescale(),feedS1,'x',color="blue",label="sampling with step equal to {} s".format(feedS1.step))
plt.plot(feedS1.timescale(),feedS1,color="blue")
plt.legend()
plt.show()

the below graph created with matplotlib is just for the example. On the Rpi of course, this would not be possible to do this

PyFina

all calculations could be made using numpy, The addition of two feeds feed1 and feed2, assuming sampling them with an equal step and from the same start, could be simply :

feed = feed1+feed2

Finally, the creation of a Fina feed could be possible with some simple methods like the following ones (plus the SQL command to inject the feed in the mariadb feed table)

def createMeta(nb,start,step,dir=dir):
    """
    create meta given :
    - a feed number
    - a unixtimestamp as start
    - a step
    """
    f=open("{}/{}.meta".format(dir,nb),"wb")
    data=np.array([0,0,step,start])
    format="<{}".format("I"*len(data))
    bin=struct.pack(format,*data)
    f.write(bin)
    f.close()

def createFeed(nb,data,dir=dir):
    """
    create a dat file given :
    - a feed number
    - a numpy vector of data
    """
    f=open("{}/{}.dat".format(dir,nb),"wb")
    format="<{}".format("f"*len(data))
    bin=struct.pack(format,*data)
    f.write(bin)
    f.close()

of course you need numpy to be installed

pip3 install numpy
TrystanLea commented 3 years ago

Hello @alexandrecuer yes happy with this, nice to see, I did experiment with porting the timeseries engines to python a while back but that project didn’t end up going anywhere. Anyway, happy for there to be post processes written in python :+1: