Closed cooperlees closed 4 years ago
This needs more work I see due to the validation performed on the global. I'll play some more. Any tips would be appreciated (USB n00b here)
Sep 28 14:47:51 home1.cooperlees.com tripplite_exporter[629309]: ValueError: Path 0001:0008:00 not in 0001:0007:00.
Hey @cooperlees thanks for the PRs! I'm happy to help provide context on the PRs and anything else you're trying to do here.
Short background - we kept having instability issues reading these UPSs with other solutions, so we built a bare-bones python driver to do it. Current state - it's rock stable for a single UPS but can get temperamental in edge cases (ex. multiple UPSs on one server). We always appreciate more users to help properly identify failure modes and fix without hard reset error handlers.
For this PR, I'm agreed on the goal, but we need to be careful about calling hid.enumerate
more than once. I'd recommend having battery_paths
as a cached variable that gets filled in on the first instantiation of Battery
. You can make it a file-level global or a static class variable (not the clearest in python but doable).
battery_paths = []
class Battery:
def __init__():
if not battery_paths:
get()
Thanks again, and hope this helps!
Ok, I pretty much had that suggestion done. I'll do my remediation in my exporter code when the device becomes unreadable.
I only have 1 device in my small NUC router, connected to the UPS, and has not been rock solid for me using this library. I think it's lack of my USB understanding so I'll try understand better and I bet it's my code here.
Here's our "production" code for a bank of batteries:
async def _sensor_loop(self, battery_paths):
"""Read batteries in an infinite loop."""
self.batteries = [Battery(path) for path in battery_paths]
for battery in self.batteries:
battery.open()
while True:
await asyncio.sleep(config.loop_times[self.speed]['sensor'])
for state, battery in zip(self.state, self.batteries):
try:
state.update(battery.get())
await self.alert(state['status'])
except Exception:
logger.exception(f"Could not read battery {battery}.")
battery.close()
battery.open()
We have the nuclear option with closing and re-opening on all exceptions, but this rarely triggers on one battery. Keep your number of USB connection instances to one and the rest should be stable.
Ok - I'm using the context manager for each "get"
def get_current_ups_data(self) -> Optional[Dict]:
if not self.battery_path:
battery_paths = get_battery_paths()
if not battery_paths:
LOG.error("No Tripplite battery found")
return None
self.battery_path = battery_paths[0]
try:
with Battery(self.battery_path) as battery:
return battery.get()
except OSError as ose:
LOG.error(f"Unable to read from USB Serial for {self.battery_path}: {ose}")
return None
I think the import time setting of battery_paths
is what's killing me. Is it bad to use the context manager here? Should I just create an object and keep it open unless get()
errors?
If you're using it in a long-running loop, I'd recommend avoiding the context manager within a loop. The constant disconnect/reconnect of the underlying comm will likely lead to instability.