Open petaramesh opened 5 months ago
HI @petaramesh would you mind sharing your code examples? I am in the same position as you and might be able to reuse / extend your work if you're okay with that.
Thank you!
Yeah I will try snipping the BT code from the rest of my project into something that can work standalone and post it here. It will take me a little time.
Very much appreciated
Well here comes my quick and dirty extraction from my quick and dirty code. If you have supported Victron devices and just put your devices MAC addresses and encryption keys in there and run it, it should start spitting received values to the console...
Make sure to install the asyncio librarires as indicated in the comments before attempting to run it.
I cut it from my whole project so it may still contain unnecessary code. I've tested it with a single Victron device I have on hand, and it works. So let me know...
NB: This comes from a project that does a host of other things so I didn't want the BT reception to use too much CPU time. If you want to receive more data or more frequently, you may wish to change :
_BT_SCAN_INTERVAL_US = const(2000000) # Scan every 2 sec (uS)
_BT_SCAN_WINDOW_US = const(100000) # Scan for 100 mS (uS)
# Pico_Victron_BT.py
#
# © Swâmi Petaramesh 2024
#
# This program is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>. 3
#
# This program is example code for receiving and decoding some Victron® devices information
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running :
#
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05
# RPI_PICO_W-20240105-v1.22.1.uf2
#
# You need to put your own devices MAC addresses and encryption keys in the code below.
# Lbraries =============================================================
#
# Micropython built-in libraries
import sys, time, struct, bluetooth, cryptolib
from machine import Pin
from cryptolib import aes
# Libraries to be installed on the Raspberry Pi Pico
# With mpremote tool (from Linux PC terminal) :
# mpremote mip install github:peterhinch/micropython-async/v3/primitives
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
import asyncio
from threadsafe import ThreadSafeQueue
# Initialisations ==========================================================
_DEBUG = const(2) # Extra debug console output
_UNKN = const(-9999) # N/A value
pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1)) # Integrated LED, ON
# Victron devices parameters and values -----------------------------------
# We need to put our Victron devices MAC addresses and encryption keys here
# The rest should be left _UNKN
victron = {
"bmv712" : { "mac": b"\xA3\x76\x6E\x7C\x22\x33",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0",
"volt": _UNKN,
"amp": _UNKN,
"soc": _UNKN,
"temp": _UNKN,
"upd": _UNKN
},
"orion" : { "mac": b"\xA3\x76\x6E\x7C\x22\x34",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF1",
"eng_detect": True,
"mode": _UNKN,
"cause": _UNKN,
"v_in": _UNKN,
"v_out": _UNKN,
"lib_mode": 2,
"lib_cause": 6,
"upd": _UNKN
},
"smartsolar" : { "mac": b"\xA3\x76\x6E\x7C\x22\x35",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF2",
"mode": _UNKN,
"pwr": _UNKN,
"amp": _UNKN,
"upd": _UNKN,
"lib_mode": 2
},
"batsense" : { "mac": b"\xA3\x76\x6E\x7C\x22\x36",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF3",
"volt": _UNKN,
"temp": _UNKN,
"upd": _UNKN
}
}
maclist= [victron[d]["mac"] for d in victron]
# System parameters ------------------------------------------------
_TMR_MAIN_LOOP = const(500) # Main loop duration
_BT_SCAN_DURATION_MS = const(0) # BT Scan default duration, 0 = forever
_BT_SCAN_INTERVAL_US = const(2000000) # Scan every 2 sec (uS)
_BT_SCAN_WINDOW_US = const(100000) # Scan for 100 mS (uS)
_BT_MIN_RSSI = const(-85) # Minimum RSSI
ble = bluetooth.BLE()
# Fixed values
_BT_EXPIRE = const(180 * 1000) # BT values expired after stalled (mS)
# This is our main status that we use everywhere
STATUS = (
"CKSUM", # 0
"?????", # 1
"Unknown", # 2
"ERROR", # 3
"FAIL!", # 4
"DELTA", # 5
"Init", # 6
"Off", # 7
"Stop", # 8
"Start", # 9
"HI-V", # 10
"HI-v", # 11
"LOW-V", # 12
"LOW-v", # 13
"LOW-%", # 14
"HI-T°", # 15
"LO-T°", # 16
"Drain", # 17
"Timer", # 18
"Timr", # 19
"Stby", # 20
"LOW", # 21
"OK", # 22
"High", # 23
"Full", # 24
"Chrg", # 25
"Chg", # 26
"Engine", # 27
"Bulk", # 28
"Absorption", # 29
"Float", # 30
"Storage", # 31
"Remote", # 32
"Lock", # 33
"EngStop", # 34
"R+Stop", # 35
"RunTime", # 36
"FloTime", # 37
"Display", # 38
"Manual", # 39
"------------" # 40
)
# Victron operation modes
VICTRON_OP = { 0: { "code": 7, "lib": "Off" },
1: { "code": 12, "lib": "LOW-V" },
2: { "code": 3, "lib": "ERROR" },
3: { "code": 28, "lib": "Bulk" },
4: { "code": 29, "lib": "Absorption" },
5: { "code": 30, "lib": "Float" },
6: { "code": 31, "lib": "Storage" },
7: { "code": 1, "lib": "Equalize" },
9: { "code": 1, "lib": "Inverting" },
11: { "code": 1, "lib": "Supply" },
245: { "code": 6, "lib": "Init" },
246: { "code": 29, "lib": "Repeated absorption" },
247: { "code": 1, "lib": "Recondition" },
248: { "code": 1, "lib": "Bat safe" },
252: { "code": 32, "lib": "Remote" }
}
# Off reason for Victron DC-DC charger
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" },
0x00000001: { "code": 4, "lib": "No-Input" },
0x00000002: { "code": 7, "lib": "Off" },
0x00000004: { "code": 7, "lib": "Off" },
0x00000008: { "code": 32, "lib": "Remote" },
0x00000010: { "code": 1, "lib": "Protection" },
0x00000020: { "code": 1, "lib": "Pay" },
0x00000040: { "code": 1, "lib": "BMS-CUT" },
0x00000080: { "code": 27, "lib": "Engine" },
0x00000081: { "code": 35, "lib": "R+Stop" },
0x00000100: { "code": 1, "lib": "Analyzing" }
}
# Initial values --------------------------------------------------
loop_time = 0
comp_time = 0
last_time = 0
# Global exception handler =======================================
def _handle_exception(loop, context):
print('Exception occurred !')
sys.print_exception(context["exception"])
sys.exit()
# Classes ========================================================
class BLEScanner:
def __init__(self, ble, target_mac_list):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._target_mac_list = target_mac_list
self._start_time = None
# Interrupt proceessing routine
def _irq(self, event, data):
global bt_queue
pin_led_bt.on()
if event == 5: # Event value for _IRQ_SCAN_RESULT
addr_type, addr, adv_type, rssi, adv_data = data
if rssi > _BT_MIN_RSSI:
if addr in self._target_mac_list and adv_type == 0:
try:
# Queue received data for async coro to process
bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)])
except IndexError:
# Queue is full
pass
pin_led_bt.off()
# Start the scanner
def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US):
self._ble.active(True)
self._start_time = time.ticks_ms()
self._ble.gap_scan(0, duration_ms, interval_us, window_us)
# Stop the scanner
def stop_scan(self):
self._ble.gap_scan(None)
self._ble.active(False)
# Functions ======================================================
def kelvin_to_celsius(kelvin):
return round(kelvin - 273.15, 2)
# Decode received and decrypted BT values
def bt_decode(dev,cleartext):
global victron
if _DEBUG: print(f"*** Found device : {dev}")
if _DEBUG >= 2:
print(" Raw Decrypted Data (Hex): ", ' '.join(['{:02X}'.format(b) for b in cleartext]))
if dev is "bmv712" or dev is "batsense":
try:
if cleartext[2:4] != b'\xFF\x7F':
victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100)
else:
victron[dev]["volt"] = _UNKN
except:
victron[dev]["volt"] = _UNKN
try:
if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10:
victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100))
else:
victron[dev]["temp"] = _UNKN
except:
victron[dev]["temp"] = _UNKN
if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN:
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]} Temp: {victron[dev]["temp"]}")
if dev is "bmv712":
try:
victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10)
if victron[dev]["soc"] == 0x3FF:
victron[dev]["soc"] = _UNKN
except:
victron[dev]["soc"] = _UNKN
try:
amp = bytearray(cleartext[8:11])
if amp[2] & 0x80 == 0x80:
amp.extend(b'\xFF')
else:
amp.extend(b'\x00')
victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000)
except:
victron[dev]["amp"] = _UNKN
if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]} Amp: {victron[dev]["amp"]}")
elif dev is "smartsolar":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10)
else:
victron[dev]["amp"] = _UNKN
except:
victron[dev]["amp"] = _UNKN
try:
if cleartext[8:10] != b'\xFF\xFF':
victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0])
else:
victron[dev]["pwr"] = _UNKN
except:
victron[dev]["pwr"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN
and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]} PWR : {victron[dev]["pwr"]} Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}")
elif dev is "orion":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[2:4] != b'\xFF\xFF':
victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100)
else:
victron[dev]["v_in"] = _UNKN
except:
victron[dev]["v_in"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100)
else:
victron[dev]["v_out"] = _UNKN
except:
victron[dev]["v_out"] = _UNKN
try:
if cleartext[6:10] != b'\xFF\xFF\xFF\xFF':
victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0])
else:
victron[dev]["cause"] = _UNKN
except:
victron[dev]["cause"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
try:
victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"]
except:
victron[dev]["lib_cause"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN
and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN)
and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1
and victron[dev]["lib_cause"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}")
if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}")
# Old BT values expiration ----------------------------------
async def bt_expire(coro_freq):
global comp_time, victron
while True:
coro_begin = time.ticks_ms()
if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["bmv712"]["volt"] = _UNKN
victron["bmv712"]["amp"] = _UNKN
victron["bmv712"]["soc"] = _UNKN
victron["bmv712"]["temp"] = _UNKN
if victron["batsense"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["batsense"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["batsense"]["volt"] = _UNKN
victron["batsense"]["temp"] = _UNKN
if victron["orion"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["orion"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["orion"]["mode"] = _UNKN
victron["orion"]["cause"] = _UNKN
victron["orion"]["v_in"] = _UNKN
victron["orion"]["v_out"] = _UNKN
victron["orion"]["lib_mode"] = 2
victron["orion"]["lib_cause"] = 40
if victron["smartsolar"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["smartsolar"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["smartsolar"]["mode"] = _UNKN
victron["smartsolar"]["pwr"] = _UNKN
victron["smartsolar"]["amp"] = _UNKN
victron["smartsolar"]["lib_mode"] = 2
if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms())
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
if coro_throttle >= 0:
await asyncio.sleep_ms(coro_throttle)
else:
await asyncio.sleep(0)
# Decrypt and decode received BT values ------------------------------------------
async def bt_decrypt(bt_queue):
global comp_time, scanner, victron
async for bt_input in bt_queue:
coro_begin = time.ticks_ms()
pin_led_bt.on()
mac = bt_input[0] # MAC address, bytes
# mac_type = bt_input[1] # Address type, integer
adv_data = bt_input[4] # Advertisement data, bytes
kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte
if _DEBUG:
timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000
adv_type = bt_input[2] # Advertisement type, integer
rssi = bt_input[3] # RSSI, integer
print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format(
timestamp,
mac=':'.join(['{:02X}'.format(b) for b in mac]),
rssi=rssi,
adv_type=adv_type
))
if _DEBUG >= 2:
record_type = struct.unpack('B',adv_data[11:12])[0]
nonce = struct.unpack('H',adv_data[12:14])[0]
# Print the entire advertising data as hex
print(" Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data]))
print(f" Record type: {record_type:#04X} Nonce: {nonce:#06X} Key byte 0: {kb0:#04X}")
for dev in victron:
if victron[dev]["mac"] == mac:
if victron[dev]["key"][0:1] == kb0.to_bytes(1,0):
if _DEBUG >= 2: print(" Encryption key matches.")
# AES-CTR Decryption
# We should use AES-CTR but it is not implemented into mycropython's
# cryptolib, so we need to fake it using ECB.
# We have at most 16 bytes to decrypt, so we can do it in a single
# pass with the nonce + a zero CTR value.
ctr = bytearray(adv_data[12:14]) # Start with nonce
ctr.extend(bytes(14)) # Counter is zero
# if _DEBUG >= 2: print(" Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
ciphertext = bytearray(adv_data[15:]) # Our ciphertext
if len(adv_data[15:]) < 16 : # Extend it to 16 bytes
ciphertext.extend(bytes(16 - len(adv_data[15:]))) # if needed
cipher = cryptolib.aes(victron[dev]["key"],1) # Initialize AES ECB with key
cipher.encrypt(ctr,ctr) # Encrypt counter
# if _DEBUG >= 2: print(" Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext
bt_decode(dev,cleartext) # Now decode what we got
else:
if _DEBUG: print(f" Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}")
break
break
if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
pin_led_bt.off()
await asyncio.sleep(0)
# Main loop ==================================================
async def main():
global comp_time, last_time, loop_time
global scanner, bt_queue, bt_stat
loop = asyncio.get_event_loop()
loop.set_exception_handler(_handle_exception)
# Bluetooth thread safe queue
bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)])
# Create scheduled tasks
task_bt_expire = asyncio.create_task(bt_expire(5000)) # Expire old BT values
task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue)) # Decrypt and decode BT values
await asyncio.sleep(0)
# Start Bluetooth BLE scanner
scanner = BLEScanner(ble, maclist)
scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US)
while True:
# Sets the main loop defined duration
loop_begin_tick = time.ticks_ms()
loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP)
# Wait until desired loop duration
loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick)
last_time = comp_time + loop_time
comp_time = 0
loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1
if loop_throttle >= 0 :
await asyncio.sleep_ms(loop_throttle)
else:
loop_throttle = 0
# Let's do it !
asyncio.run(main())
I've changed some LED related values above, that's not that important, but it was wrong (as my original project uses LEDs in a different awy).
I got it compiled. There is a double parenthesis and missing import of "sys" (will post my code once I got it running)
I am running on MicroPython v1.23.0-preview.91.g5a68e82d1 on 2024-02-07; Raspberry Pi Pico W with RP2040
.
I am looking to find a SmartShunt
but I am not "seeing" my device. When I run the scan with victron-ble
I get a value for mac
like
B9FD07CC-8128-EC97-96E4-2284631B3CA6
but no hex value. Since I don't know the type of the value I am not sure if the key mismatch is my fault or if the device is not correct.
How did you get the original hex values extracted?
You can these values in the VictronConnedct app on your phone. Go to your device, then “parameters”, menu, “Product info”, and down there “Show encryption key”.
Thanks for pointing that out. I got it working with your instructions. See my adopted code here: https://gist.github.com/georg90/c6822fa28261059e4c8361bdcff13f32
I think this is a great basis for starting of a victron_ble port - maybe it can be considered. Otherwise anyone coming from Google will find this helpful. Thanks alot @petaramesh for the port!
Hello,
I've updated my code to allow receiving and processing Victron BT advertisements even when the VictronConnect phone app is connected to the Victron device (previous code would only work when the VictronConnect app was not connected to the device).
So here is the new code :
# Pico_Victron_BT.py
#
# © Swâmi Petaramesh 2024
#
# This program is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>. 3
#
# This program is example code for receiving and decoding some Victron® devices information
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running :
#
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05
# RPI_PICO_W-20240105-v1.22.1.uf2
#
# You need to put your own devices MAC addresses and encryption keys in the code below.
# Lbraries =============================================================
#
# Micropython built-in libraries
import sys, time, struct, bluetooth, cryptolib
from machine import Pin
from cryptolib import aes
# Libraries to be installed on the Raspberry Pi Pico
# With mpremote tool (from Linux PC terminal) :
# mpremote mip install github:peterhinch/micropython-async/v3/primitives
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
import asyncio
from threadsafe import ThreadSafeQueue
# Initialisations ==========================================================
_DEBUG = const(2) # Extra debug console output
_UNKN = const(-9999) # N/A value
pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1) # Integrated LED, ON
# Victron devices parameters and values -----------------------------------
# We need to put our Victron devices MAC addresses and encryption keys here
# The rest should be left _UNKN
victron = {
"bmv712" : { "mac": b"\xA3\x76\x6E\x7C\x22\x33",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF0",
"volt": _UNKN,
"amp": _UNKN,
"soc": _UNKN,
"temp": _UNKN,
"upd": _UNKN
},
"orion" : { "mac": b"\xA3\x76\x6E\x7C\x22\x34",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF1",
"eng_detect": True,
"mode": _UNKN,
"cause": _UNKN,
"v_in": _UNKN,
"v_out": _UNKN,
"lib_mode": 2,
"lib_cause": 6,
"upd": _UNKN
},
"smartsolar" : { "mac": b"\xA3\x76\x6E\x7C\x22\x35",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF2",
"mode": _UNKN,
"pwr": _UNKN,
"amp": _UNKN,
"upd": _UNKN,
"lib_mode": 2
},
"batsense" : { "mac": b"\xA3\x76\x6E\x7C\x22\x36",
"key": b"\x12\x34\x56\x78\x9A\xBC\xDE\xF0\x12\x34\x56\x78\x9A\xBC\xDE\xF3",
"volt": _UNKN,
"temp": _UNKN,
"upd": _UNKN
}
}
maclist= [victron[d]["mac"] for d in victron]
# System parameters ------------------------------------------------
_TMR_MAIN_LOOP = const(500) # Main loop duration
_BT_SCAN_DURATION_MS = const(0) # BT Scan default duration, 0 = forever
_BT_SCAN_INTERVAL_US = const(3000000) # Scan every 3 sec (uS)
_BT_SCAN_WINDOW_US = const(400000) # Scan for 400 mS (uS)
_BT_MIN_RSSI = const(-85) # Minimum RSSI
ble = bluetooth.BLE()
# Fixed values
_BT_EXPIRE = const(180 * 1000) # BT values expired after stalled (mS)
# This is our main status that we use everywhere
STATUS = (
"CKSUM", # 0
"?????", # 1
"Unknown", # 2
"ERROR", # 3
"FAIL!", # 4
"DELTA", # 5
"Init", # 6
"Off", # 7
"Stop", # 8
"Start", # 9
"HI-V", # 10
"HI-v", # 11
"LOW-V", # 12
"LOW-v", # 13
"LOW-%", # 14
"HI-T°", # 15
"LO-T°", # 16
"Drain", # 17
"Timer", # 18
"Timr", # 19
"Stby", # 20
"LOW", # 21
"OK", # 22
"High", # 23
"Full", # 24
"Chrg", # 25
"Chg", # 26
"Engine", # 27
"Bulk", # 28
"Absorption", # 29
"Float", # 30
"Storage", # 31
"Remote", # 32
"Lock-V", # 33
"EngStop", # 34
"R+Stop", # 35
"RunTime", # 36
"FloTime", # 37
"Display", # 38
"Manual", # 39
"------------" # 40
)
# Victron operation modes
VICTRON_OP = { 0: { "code": 7, "lib": "Off" },
1: { "code": 12, "lib": "LOW-V" },
2: { "code": 3, "lib": "ERROR" },
3: { "code": 28, "lib": "Bulk" },
4: { "code": 29, "lib": "Absorption" },
5: { "code": 30, "lib": "Float" },
6: { "code": 31, "lib": "Storage" },
7: { "code": 1, "lib": "Equalize" },
9: { "code": 1, "lib": "Inverting" },
11: { "code": 1, "lib": "Supply" },
245: { "code": 6, "lib": "Init" },
246: { "code": 29, "lib": "Repeated absorption" },
247: { "code": 1, "lib": "Recondition" },
248: { "code": 1, "lib": "Bat safe" },
252: { "code": 32, "lib": "Remote" }
}
# Off reason for Victron DC-DC charger
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" },
0x00000001: { "code": 4, "lib": "No-Input" },
0x00000002: { "code": 7, "lib": "Off" },
0x00000004: { "code": 7, "lib": "Off" },
0x00000008: { "code": 32, "lib": "Remote" },
0x00000010: { "code": 1, "lib": "Protection" },
0x00000020: { "code": 1, "lib": "Pay" },
0x00000040: { "code": 1, "lib": "BMS-CUT" },
0x00000080: { "code": 27, "lib": "Engine" },
0x00000081: { "code": 33, "lib": "Lock-V" },
0x00000100: { "code": 1, "lib": "Analyzing" }
}
# Initial values --------------------------------------------------
loop_time = 0
comp_time = 0
last_time = 0
# Global exception handler =======================================
def _handle_exception(loop, context):
print('Exception occurred !')
sys.print_exception(context["exception"])
sys.exit()
# Classes ========================================================
class BLEScanner:
def __init__(self, ble, target_mac_list):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._target_mac_list = target_mac_list
self._start_time = None
# Interrupt proceessing routine
def _irq(self, event, data):
global bt_queue
pin_led_bt.on()
if event == 5: # Event value for _IRQ_SCAN_RESULT
addr_type, addr, adv_type, rssi, adv_data = data
if (rssi > _BT_MIN_RSSI
and adv_data[5:8] == b'\xE1\x02\x10'
and addr in self._target_mac_list
and ( adv_type == 0 or adv_type == 2)
and adv_data[1:2] == b'\x01'
):
try:
# Queue received data for async coro to process
bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)])
except IndexError:
# Queue is full
pass
pin_led_bt.off()
# Start the scanner
def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US):
self._ble.active(True)
self._start_time = time.ticks_ms()
self._ble.gap_scan(0, duration_ms, interval_us, window_us)
# Stop the scanner
def stop_scan(self):
self._ble.gap_scan(None)
self._ble.active(False)
# Functions ======================================================
def kelvin_to_celsius(kelvin):
return round(kelvin - 273.15, 2)
# Decode received and decrypted BT values
def bt_decode(dev,cleartext):
global victron
if _DEBUG: print(f"*** Found device : {dev}")
if _DEBUG >= 2:
print(" Raw Decrypted Data (Hex): ", ' '.join(['{:02X}'.format(b) for b in cleartext]))
if dev is "bmv712" or dev is "batsense":
try:
if cleartext[2:4] != b'\xFF\x7F':
victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100)
else:
victron[dev]["volt"] = _UNKN
except:
victron[dev]["volt"] = _UNKN
try:
if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10:
victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100))
else:
victron[dev]["temp"] = _UNKN
except:
victron[dev]["temp"] = _UNKN
if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN:
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]} Temp: {victron[dev]["temp"]}")
if dev is "bmv712":
try:
victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10)
if victron[dev]["soc"] == 0x3FF:
victron[dev]["soc"] = _UNKN
except:
victron[dev]["soc"] = _UNKN
try:
amp = bytearray(cleartext[8:11])
if amp[2] & 0x80 == 0x80:
amp.extend(b'\xFF')
else:
amp.extend(b'\x00')
victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000)
except:
victron[dev]["amp"] = _UNKN
if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]} Amp: {victron[dev]["amp"]}")
elif dev is "smartsolar":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10)
else:
victron[dev]["amp"] = _UNKN
except:
victron[dev]["amp"] = _UNKN
try:
if cleartext[8:10] != b'\xFF\xFF':
victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0])
else:
victron[dev]["pwr"] = _UNKN
except:
victron[dev]["pwr"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN
and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]} PWR : {victron[dev]["pwr"]} Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}")
elif dev is "orion":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[2:4] != b'\xFF\xFF':
victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100)
else:
victron[dev]["v_in"] = _UNKN
except:
victron[dev]["v_in"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100)
else:
victron[dev]["v_out"] = _UNKN
except:
victron[dev]["v_out"] = _UNKN
try:
if cleartext[6:10] != b'\xFF\xFF\xFF\xFF':
victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0])
else:
victron[dev]["cause"] = _UNKN
except:
victron[dev]["cause"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
try:
victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"]
except:
victron[dev]["lib_cause"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN
and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN)
and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1
and victron[dev]["lib_cause"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}")
if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}")
# Old BT values expiration ----------------------------------
async def bt_expire(coro_freq):
global comp_time, victron
while True:
coro_begin = time.ticks_ms()
if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["bmv712"]["volt"] = _UNKN
victron["bmv712"]["amp"] = _UNKN
victron["bmv712"]["soc"] = _UNKN
victron["bmv712"]["temp"] = _UNKN
if victron["batsense"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["batsense"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["batsense"]["volt"] = _UNKN
victron["batsense"]["temp"] = _UNKN
if victron["orion"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["orion"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["orion"]["mode"] = _UNKN
victron["orion"]["cause"] = _UNKN
victron["orion"]["v_in"] = _UNKN
victron["orion"]["v_out"] = _UNKN
victron["orion"]["lib_mode"] = 2
victron["orion"]["lib_cause"] = 40
if victron["smartsolar"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["smartsolar"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["smartsolar"]["mode"] = _UNKN
victron["smartsolar"]["pwr"] = _UNKN
victron["smartsolar"]["amp"] = _UNKN
victron["smartsolar"]["lib_mode"] = 2
if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms())
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
if coro_throttle >= 0:
await asyncio.sleep_ms(coro_throttle)
else:
await asyncio.sleep(0)
# Decrypt and decode received BT values ------------------------------------------
async def bt_decrypt(bt_queue):
global comp_time, scanner, victron
async for bt_input in bt_queue:
coro_begin = time.ticks_ms()
pin_led_bt.on()
mac = bt_input[0] # MAC address, bytes
# mac_type = bt_input[1] # Address type, integer
adv_data = bt_input[4] # Advertisement data, bytes
kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte
if _DEBUG:
timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000
adv_type = bt_input[2] # Advertisement type, integer
rssi = bt_input[3] # RSSI, integer
print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format(
timestamp,
mac=':'.join(['{:02X}'.format(b) for b in mac]),
rssi=rssi,
adv_type=adv_type
))
if _DEBUG >= 2:
record_type = struct.unpack('B',adv_data[11:12])[0]
nonce = struct.unpack('H',adv_data[12:14])[0]
# Print the entire advertising data as hex
print(" Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data]))
print(f" Record type: {record_type:#04X} Nonce: {nonce:#06X} Key byte 0: {kb0:#04X}")
for dev in victron:
if victron[dev]["mac"] == mac:
if victron[dev]["key"][0:1] == kb0.to_bytes(1,0):
if _DEBUG >= 2: print(" Encryption key matches.")
# AES-CTR Decryption
# We should use AES-CTR but it is not implemented into mycropython's
# cryptolib, so we need to fake it using ECB.
# We have at most 16 bytes to decrypt, so we can do it in a single
# pass with the nonce + a zero CTR value.
ctr = bytearray(adv_data[12:14]) # Start with nonce
ctr.extend(bytes(14)) # Counter is zero
# if _DEBUG >= 2: print(" Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
ciphertext = bytearray(adv_data[15:]) # Our ciphertext
if len(adv_data[15:]) < 16 : # Extend it to 16 bytes
ciphertext.extend(bytes(16 - len(adv_data[15:]))) # if needed
cipher = cryptolib.aes(victron[dev]["key"],1) # Initialize AES ECB with key
cipher.encrypt(ctr,ctr) # Encrypt counter
# if _DEBUG >= 2: print(" Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext
bt_decode(dev,cleartext) # Now decode what we got
else:
if _DEBUG: print(f" Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}")
break
break
if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
pin_led_bt.off()
await asyncio.sleep(0)
# Main loop ==================================================
async def main():
global comp_time, last_time, loop_time
global scanner, bt_queue, bt_stat
loop = asyncio.get_event_loop()
loop.set_exception_handler(_handle_exception)
# Bluetooth thread safe queue
bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)])
# Create scheduled tasks
task_bt_expire = asyncio.create_task(bt_expire(5000)) # Expire old BT values
task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue)) # Decrypt and decode BT values
await asyncio.sleep(0)
# Start Bluetooth BLE scanner
scanner = BLEScanner(ble, maclist)
scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US)
while True:
# Sets the main loop defined duration
loop_begin_tick = time.ticks_ms()
loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP)
# Wait until desired loop duration
loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick)
last_time = comp_time + loop_time
comp_time = 0
loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1
if loop_throttle >= 0 :
await asyncio.sleep_ms(loop_throttle)
else:
loop_throttle = 0
# Let's do it !
asyncio.run(main())
https://github.com/jensimik/vicky i made another micropython project with inspiration from the code here (on a tiny m5stick - around 20 euro) intended to be used in a van with solar, dcdc and battery monitor.
Hello,
I'm creating a hobby microcontroller project that involves reading Victron systems (a BMV-712 smart, a battery SmartSense, a Smart MPPT charger and an Orion DC-DC charger) on a Raspberry Pi Pico running Micropython.
I found out your project and it exactly fits the parts I miss for getting and processing the data from the Victrons.
I tested it on my Linux PC and it works like a charm.
So I tried installing the libraries onto my Pico, but bummer ! it doesn't work because Micropython doesn't have most of the standard CPython libraries you import - although it may generally provide what should be needed.
I'm not a seasoned Python dev myself - actually this is my first project in python, let alone on a microcontroller, but I have written all the code I need and it works, except for the Victron Bluetooth part that I'm still missing.
I don't feel at ease enough with your code for trying to port it myself to this environment, so i was wondering if it was in your own developments projects or if you could possiblly consider it ?
Many thanks in advance.
Addendum :
I've now written enough quick and dirty code so I can retrieve Victron's values (not all of them, just those that I need) without using your libraries. I got inspiration from them but couldn't easily port them as they rely on a number of libraries / modules that are not available under Micropython or are trimmed down (i.e. no support for AES-CTR in cryptolib, no enum, no typing, and worse of all no construct and no support at all for 24-bit integers). So I had to work around all this and use different and dirty solutions.
Still, generic libraries for doing this in a cleaner way would be nice.