rcasula / kostalpiko-homeassistant

19 stars 14 forks source link

Code for reading data #20

Open ArjanDeVries opened 1 year ago

ArjanDeVries commented 1 year ago

I have some code that connects to my piko 3.6 and reads data from it. It was written a long time ago by someone else in python 2 and i converted it to python 3 and it is working now. It works by connecting to port 81. Apparently this was used by tooling from kostal. It does not require any user or password and is very fast. Also web scraping is not needed anymore. I was looking how i can implement this in HA. But im just starting with HA so thats why im looking for some extra help. Regards, Arjan

rcasula commented 1 year ago

Hi @ArjanDeVries I've never tried to connect to port 81. When I have time I would surely try it. In the mean time I would leave this issue open.

BTW can you share the code that you're using? Maybe we can integrate it here!

ArjanDeVries commented 1 year ago

Here is my python code:

import socket

def cnv_status_txt(val):
    txt = "Communication error"
    if val == 0:
        txt = "Off"
    if val == 1:
        txt = "Idle"
    if val == 2:
        txt = "Starting"
    if val == 3:
        txt = "Running-MPP"
    if val == 4:
        txt = "Running-Regulated"
    if val == 5:
        txt = "Running"
    return txt

def CalcChkSum(St):
    ByteVal = b"\x00"
    Chk = 0
    if len(St) == 0:
        return 0
    for i in range(len(St)):
        Chk -= St[i]
        Chk %= 256
    ByteVal = bytes([Chk])

    return ByteVal

def GetWord(St, Idx):
    Val = 0
    Val = St[Idx] + 256 * St[Idx + 1]
    return Val

def GetDWord(St, Idx):
    Val = 0
    Val = St[Idx] + 256 * St[Idx + 1] + 65536 * St[Idx + 2] + 256 * 65536 * St[Idx + 3]
    return Val

def ChkSum(St):
    Chk = 0
    if len(St) == 0:
        return 0
    for i in range(len(St)):
        Chk += St[i]
        Chk %= 256
    if Chk == 0:
        return 1
    else:
        return 0

def SndRecv(addr, snd, dbg):
    snd = b"\x62" + addr + b"\x03" + addr + snd
    snd += CalcChkSum(snd) + b"\x00"
    s.send(snd)
    recv = bytearray()
    data = bytearray()
    while 1:
        try:
            data = s.recv(4096)
        except socket.error:
            recv += data
        recv += data
        data = bytearray()
        if not data:
            break
    if (len(recv) > 0) and (recv[0] == 255):
        recv[::] = b''
    if dbg and (len(recv) > 0) and (recv[0] != 255):
        print("Sent:", snd)
        print("Recv:", recv)
    return recv

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

host = "192.168.1.1" # IP Adress of your PIKO
port = 81
Addr = b"\xff"
Dbg = False

# Setup TCP socket
s.settimeout(5)
NetStatus = 0
Status = -1
try:
    s.connect((host, port))
    s.settimeout(1)
except socket.error as msg:
    NetStatus = msg

if NetStatus == 0:
    Snd = b"\x00\x57"
    Recv = SndRecv(Addr, Snd, Dbg)
    if ChkSum(Recv) != 0:
        Status = Recv[5]
        Error = Recv[6]
        ErrorCode = GetWord(Recv, 7)
    if Status > 5:
        Status = -1

StatusTxt = cnv_status_txt(Status)
if Dbg:
    print("Inverter Status : %d (%s)" % (Status, StatusTxt))

if Status != -1:
    CA_P = 0
    # Get Total Wh
    TotalWh = -1
    Snd = b"\x00\x45"
    Recv = SndRecv(Addr, Snd, Dbg)
    if ChkSum(Recv) != 0:
        TotalWh = GetDWord(Recv, 5)
        print("Today KWh : %d" % (TotalWh/1000))

    # Get Today Wh
    TodayWh = -1
    # Recv = ""
    Snd = b"\x00\x9d"
    Recv = SndRecv(Addr, Snd, Dbg)
    if ChkSum(Recv) != 0:
        TodayWh = GetDWord(Recv, 5)
        print('Today KWh : %f ' % (TodayWh/1000))

    # Get Technical data
    TechData = -1
    # Recv = ""
    Snd = b"\x00\x43"
    Recv = SndRecv(Addr, Snd, Dbg)
    if ChkSum(Recv) != 0 and (len(Recv) > 65):
        TechData = 1
        CC1_U = GetWord(Recv, 5) * 1.0 / 10
        CC1_I = GetWord(Recv, 7) * 1.0 / 100
        CC1_P = GetWord(Recv, 9)
        CC1_T = GetWord(Recv, 11)
        CC1_S = GetWord(Recv, 13)
        CC2_U = GetWord(Recv, 15) * 1.0 / 10
        CC2_I = GetWord(Recv, 17) * 1.0 / 100
        CC2_P = GetWord(Recv, 19)
        CC2_T = GetWord(Recv, 21)
        CC2_S = GetWord(Recv, 23)
        CC3_U = GetWord(Recv, 25) * 1.0 / 10
        CC3_I = GetWord(Recv, 27) * 1.0 / 100
        CC3_P = GetWord(Recv, 29)
        CC3_T = GetWord(Recv, 31)
        CC3_S = GetWord(Recv, 33)
        CA1_U = GetWord(Recv, 35) * 1.0 / 10
        CA1_I = GetWord(Recv, 37) * 1.0 / 100
        CA1_P = GetWord(Recv, 39)
        CA1_T = GetWord(Recv, 41)
        CA2_U = GetWord(Recv, 43) * 1.0 / 10
        CA2_I = GetWord(Recv, 45) * 1.0 / 100
        CA2_P = GetWord(Recv, 47)
        CA2_T = GetWord(Recv, 49)
        CA3_U = GetWord(Recv, 51) * 1.0 / 10
        CA3_I = GetWord(Recv, 53) * 1.0 / 100
        CA3_P = GetWord(Recv, 55)
        CA3_T = GetWord(Recv, 57)
        CA_S = GetWord(Recv, 61)
        CC_P = CC1_P + CC2_P + CC3_P
        CA_P = CA1_P + CA2_P + CA3_P
        if CC_P < 1:
            Eff = 0
        else:
            Eff = CA_P * 100.0 / CC_P

    # Print output current and total ever
    print("%d;%d" % (CA_P, TotalWh))
Rasfumi commented 1 year ago

Hi, i would be very interested in this integration too! @ArjanDeVries Did you find a way to integrate it in HA?

ArjanDeVries commented 1 year ago

No i did only hand over my code on GitHub.

Regards, Arjan

On Mon, 1 May 2023, 19:21 Rasfumi, @.***> wrote:

Hi, i would be very interested in this integration too! @ArjanDeVries https://github.com/ArjanDeVries Did you find a way to integrate it in HA?

— Reply to this email directly, view it on GitHub https://github.com/rcasula/kostalpiko-homeassistant/issues/20#issuecomment-1529976046, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACJVRUGQFI7EII2B5VO6QU3XD7WJNANCNFSM6AAAAAAS5YNTFY . You are receiving this because you were mentioned.Message ID: @.***>