Additional context
As a convenience, this is what the fix could be. But as I'm not sure of your use case, I'm not presenting that as a PR
import BAC0
import asyncio
import logging
from signal import SIGINT, SIGTERM, signal
from bacpypes3.npdu import Address
LOGGER = None
BAC0_LOG = '/root/.BAC0/BAC0.log'
class BACnet():
"""BACnet Test module"""
bacnet = None #class instance, a way to access it from handler to disconnect when it's over
def __init__(self, log):
# Set the log
global LOGGER
LOGGER = log
# Setup the BAC0 Log
BAC0.log_level(log_file=logging.DEBUG, stdout=logging.INFO, stderr=logging.CRITICAL)
self.devices = []
async def discover(self, local_ip=None):
LOGGER.info("Performing BACnet discovery...")
BACnet.bacnet = BAC0.lite(ip=local_ip)
while BACnet.bacnet._initialized is False:
await asyncio.sleep(0.01)
LOGGER.info("Local BACnet object: " + str(BACnet.bacnet))
try:
await BACnet.bacnet._discover(global_broadcast=True) # calling the async version with await to wait for result
except Exception as e:
LOGGER.error(e)
LOGGER.info("BACnet discovery complete")
with open(BAC0_LOG,'r',encoding='utf-8') as f:
bac0_log = f.read()
LOGGER.info("BAC0 Log:\n" + bac0_log)
self.devices = await BACnet.bacnet._devices(_return_list=True) # bacnet.devices have been hidden under _devices to keep the property capability... and allow returning list if required... or a good looking table.
# Check if the device being tested is in the discovered devices list
async def validate_device(self, local_ip, device_ip):
result = None
LOGGER.info("Validating BACnet device: " + device_ip)
await self.discover(local_ip)
LOGGER.info("BACnet Devices Found: " + str(len(self.devices)))
if len(self.devices) > 0:
# Load a fail result initially and pass only
# if we can validate it's the right device responding
result = False, (
f'Could not confirm discovered BACnet device is the ' +
'same as device being tested')
for device in self.devices:
deviceName, vendorName, devId, device_address, network_number = device
LOGGER.info("Checking Device: " + str(device))
if Address(device_ip) == device_address: # or 192.168.1.3 in 192.168.30 returns True
result = True, f'Device IP {device_ip} matches discovered device'
break
else:
result = None, 'BACnet discovery could not resolve any devices'
if result is not None:
LOGGER.info(result[1])
return result
def handler(sig, sig2):
print(f"Got signal: {sig!s}, shutting down.")
BACnet.bacnet.disconnect()
loop = asyncio.get_event_loop()
loop.stop()
# loop.remove_signal_handler(SIGTERM) ## doesn't work on Windows, using signal instead
# loop.add_signal_handler(SIGINT, lambda: None)
def main(logger, local_ip, device_ip):
loop = asyncio.get_event_loop()
signal(SIGINT, handler)
signal(SIGTERM, handler)
loop.create_task(_main(local_ip=local_ip, logger=logger, device_ip=device_ip))
loop.run_forever()
tasks = asyncio.all_tasks(loop=loop)
for t in tasks:
t.cancel()
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
async def _main(logger, local_ip, device_ip):
b = BACnet(logger)
result = await b.validate_device(local_ip=local_ip, device_ip=device_ip)
print(result)
handler("Over", None)
# This was a quick way to make it work on its own, locally
if __name__ == '__main__':
logger = logging.getLogger()
device_ip='192.168.211.63'
local_ip='192.168.211.208/24'
main(logger=logger, local_ip=local_ip, device_ip=device_ip)
Describe the bug Next version will use async and this will break actual code Actual code : https://github.com/google/testrun/blob/254effde47f77e6b5255a69c1f46174ed94ccc51/modules/test/protocol/python/src/protocol_bacnet.py#L20
To Reproduce Async development is done here : https://github.com/ChristianTremblay/BAC0/tree/async
Additional context As a convenience, this is what the fix could be. But as I'm not sure of your use case, I'm not presenting that as a PR