Closed peca2345 closed 4 months ago
Hello,
Is it possibile to integrate the heatpump to this integration, there is a user that he is able to read and comand it using this code, but i am not able to upgrade the component:
import argparse import base64 import sys
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import json import socket from requests import post
GENERIC_KEY = "a3K8Bx%2r8Y7#xDh"
class ScanResult:
ip = ''
port = 0
id = ''
name = '
def __init__(self, ip, port, id, name=''):
self.ip = ip
self.port = port
self.id = id
self.name = name
def send_data(ip, port, data): if args.verbose: print(f'send_data: ip={ip}, port={port}, data={data}')
s = socket.socket(type=socket.SOCK_DGRAM, proto=socket.IPPROTO_UDP)
s.settimeout(5)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(args, 'socket_interface') and args.socket_interface:
s.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, args.socket_interface.encode('ascii'))
s.sendto(data, (ip, port))
return s.recv(1024)
def create_request(tcid, pack_encrypted, i=0): return '{"cid":"app","i":' + str(i) + ',"t":"pack","uid":0,"tcid":"' + tcid + '","pack":"' + pack_encrypted + '"}'
def add_pkcs7_padding(data): length = 16 - (len(data) % 16) padded = data + chr(length) * length return padded
def create_cipher(key): return Cipher(algorithms.AES(key.encode('utf-8')), modes.ECB(), backend=default_backend())
def decrypt(pack_encoded, key): decryptor = create_cipher(key).decryptor() pack_decoded = base64.b64decode(pack_encoded) pack_decrypted = decryptor.update(pack_decoded) + decryptor.finalize() pack_unpadded = pack_decrypted[0:pack_decrypted.rfind(b'}') + 1] return pack_unpadded.decode('utf-8')
def decrypt_generic(pack_encoded): return decrypt(pack_encoded, GENERIC_KEY)
def encrypt(pack, key): encryptor = create_cipher(key).encryptor() pack_padded = add_pkcs7_padding(pack) pack_encrypted = encryptor.update(bytes(pack_padded, encoding='utf-8')) + encryptor.finalize() pack_encoded = base64.b64encode(pack_encrypted) return pack_encoded.decode('utf-8')
def encrypt_generic(pack): return encrypt(pack, GENERIC_KEY)
def get_status(): print(f'Getting parameters: {", ".join(args.params)}')
cols = '"Pow","Mod","WatBoxTemSet","HeWatOutTemSet","CoWatOutTemSet","HeHomTemSet","CoHomTemSet","Quiet","AllInWatTemHi","AllInWatTemLo","AllOutWatTemHi","AllOutWatTemLo","WatBoxTemHi","WatBoxTemLo","WatBoxElcHeRunSta","FastHtWter","RmoHomTemHi","RmoHomTemLo"'
pack = f'{{"cols":[{cols}],"mac":"{args.id}","t":"status"}}'
pack_encrypted = encrypt(pack, args.key)
request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))
response = json.loads(result)
if args.verbose:
print(f'get_param: response={response}')
if response["t"] == "pack":
pack = response["pack"]
pack_decrypted = decrypt(pack, args.key)
pack_json = json.loads(pack_decrypted)
if args.verbose:
print(f'get_param: pack={pack}, json={pack_json}')
response_data=dict(zip(pack_json['cols'], pack_json['dat']))
sensor_data={}
sensor_data["pdc_power"]=response_data["Pow"]
sensor_data["pdc_mode"]=response_data["Mod"]
sensor_data["pdc_setpoint_acs"]=response_data["WatBoxTemSet"]
sensor_data["pdc_temp_mandata_riscaldamento"]=response_data["HeWatOutTemSet"]
sensor_data["pdc_temp_mandata_condizionamento"]=response_data["CoWatOutTemSet"]
sensor_data["pdc_setpoint_ambiente_risc"]=response_data["HeHomTemSet"]
sensor_data["pdc_setpoint_ambiente_cond"]=response_data["CoHomTemSet"]
sensor_data["pdc_quiet_mode"]=response_data["Quiet"]
sensor_data["pdc_res_elet_boiler"]=response_data["WatBoxElcHeRunSta"]
sensor_data["pdc_acs_rapida"]=response_data["WatBoxElcHeRunSta"]
sensor_data["pdc_temp_ritorno"]=str(response_data["AllInWatTemHi"])[1:]+"."+str(response_data["AllInWatTemLo"])
sensor_data["pdc_temp_mandata"]=str(response_data["AllOutWatTemHi"])[1:]+"."+str(response_data["AllOutWatTemLo"])
sensor_data["pdc_temp_acs"]=str(response_data["WatBoxTemHi"])[1:]+"."+str(response_data["WatBoxTemLo"])
sensor_data["pdc_temp_ambiente"]=str(response_data["RmoHomTemHi"])[1:]+"."+str(response_data["RmoHomTemLo"])
headers = {"Authorization": "Bearer ### Your HA Long Live access Token ###",}
base_url = "http://localhost:8123/api/states/"
for key in sensor_data.keys():
sensor_attributes={}
if ("temp" in key) or ("setpoint" in key):
entity="sensor.%s" % key
sensor_attributes={"unit_of_measurement": "°C", "state_class":"measurement", "device_class":"temperature"}
elif ("pdc_x_" in key):
entity="sensor.%s" % key
elif key == "pdc_mode":
entity="sensor.%s" % key
else:
entity="binary_sensor.%s" % key
url=base_url+entity
payload={"state":sensor_data[key],"attributes":sensor_attributes}
submit = post(url, headers=headers, json=payload)
print(submit.text)
def set_param(): kv_list = [i.split('=') for i in args.params] errors = [i for i in kv_list if len(i) != 2]
if len(errors) > 0:
print(f'Invalid parameters detected: {errors}')
exit(1)
print(f'Setting parameters: {", ".join("=".join(i) for i in kv_list)}')
opts = ','.join(f'"{i[0]}"' for i in kv_list)
ps = ','.join(i[1] for i in kv_list)
pack = f'{{"opt":[{opts}],"p":[{ps}],"t":"cmd"}}'
print(pack)
pack_encrypted = encrypt(pack, args.key)
request = '{"cid":"app","i":0,"pack":"%s","t":"pack","tcid":"%s","uid":0}' \
% (pack_encrypted, args.id)
result = send_data(args.client, 7000, bytes(request, encoding='utf-8'))
response = json.loads(result)
if args.verbose:
print(f'set_param: response={response}')
if response["t"] == "pack":
pack = response["pack"]
pack_decrypted = decrypt(pack, args.key)
pack_json = json.loads(pack_decrypted)
if args.verbose:
print(f'set_param: pack={pack}')
if pack_json['r'] != 200:
print('Failed to set parameter')
if name == 'main': parser = argparse.ArgumentParser()
parser.add_help = True
parser.add_argument('command', help='You can use the following commands: search, get, set, decode, status')
parser.add_argument('-c', '--client', help='IP address of the client device')
parser.add_argument('-b', '--broadcast', help='Broadcast IP address of the network the devices connecting to')
parser.add_argument('-i', '--id', help='Unique ID of the device')
parser.add_argument('-k', '--key', help='Unique encryption key of the device')
parser.add_argument('--verbose', help='Enable verbose logging', action='store_true')
if sys.platform == 'linux':
parser.add_argument('--socket-interface', help='Bind the socket to a specific network interface')
parser.add_argument('params', nargs='*', default=None, type=str)
args = parser.parse_args()
command = args.command.lower()
if command == 'search':
if args.broadcast is None:
print('Error: search command requires a broadcast IP address')
exit(1)
search_devices()
elif command == 'get':
if args.params is None or len(args.params) == 0 or args.client is None or args.id is None or args.key is None:
print('Error: get command requires a parameter name, a client IP (-c), a device ID (-i) and a device key '
'(-k)')
exit(1)
get_param()
elif command == 'set':
if args.params is None or len(args.params) == 0 or args.client is None or args.id is None or args.key is None:
print('Error: set command requires at least one key=value pair, a client IP (-c), a device ID (-i) and a '
'device key (-k)')
exit(1)
set_param()
########## My Mod ################ elif command == 'decode': DEVICE_KEY='## La Tua Device Key ####' input_pack=sys.stdin.read()
if input_pack is None:
print('Error: decode command requires pack string on std input')
exit(1)
print(f'input pack: {input_pack}')
# decoded_pack=decrypt(input_pack, GENERIC_KEY)
# print(f'decoded generic: {decoded_pack}')
decoded_pack=decrypt(input_pack, DEVICE_KEY)
print(f'decoded device: {decoded_pack}')
elif command == 'status':
if args.client is None or args.id is None or args.key is None:
print('Error: status command requires a client IP (-c), a device ID (-i) and a device key '
'(-k)')
exit(1)
get_status()
########## END Mod ################ else: print('Error: unknown command "%s"' % args.command) exit(1)
I want this feature too.
Feel free to create a PR with support for this. Closing this for now because its not in the current scope.
Hi, would it be possible to add support for this model please? I'm mainly interested in reading sensors and controlling multiple functions. https://drive.google.com/file/d/1_o67xv75g68sQVmECE-3xjOdgQOLe2Cn/view