JoelBender / BACpypes3

BACnet communications library
33 stars 7 forks source link

How to make multiple COV-subscriptions #22

Open aerobyte-de opened 8 months ago

aerobyte-de commented 8 months ago

Hi, I would need to be able to dynamically add and remove COV subscriptions in my application. How can this be realized? I have the example "cov-client.py" running and also get COV notifications from the server, however I am not clear how to subscribe to multiple objects. Can someone please help me here? Thank you very much.

Great work by the way, love this project!

JoelBender commented 8 months ago

I understand. You want a thing that you can create that has the same parameters as change_of_value() that runs by itself until you stop it, and you don't want the stopping part to wait until after it gets a notification like this.

Call this thing a Snork and pattern the API like a Thread, even though it won't be (don't mix threads and asyncio, you can do it but there be dragons). Every instance gets its own identity so you can refer to them, keep track of which one is generating various log messages, etc. I just so happen to use a process identifier that appears in the BACnet traffic so you can line up the traffic with the snork instance.

There is a new cov-client-shell.py sample that should help get you started. The interesting bits are here where it waits for some incoming change or the fini event set, cancels the future that didn't complete, and gets the next value from the context manager.

aerobyte-de commented 8 months ago

Thanks a lot, got it running with this! Should anyone else have this use case: I created a pandas dataframe with the data points to subscribe to and iterate over that dataframe and start the subscriptions with this command in the cov-client-shell.py example:

for index, row in scanned_datapoints.iterrows():
                print(f'     Subscribing Index: {index}, deviceip: {row["deviceip"]}, object-type: {row["object-type"]}, object-instance: {row["object-instance"]}')
                await cmd.do_start(
                    device_address=Address(row["deviceip"]),
                    object_identifier=row["objectidentifier"],
                    property_reference=PropertyReference(row["object-name"])
                )

With Python version > 3.10 I get the following error message: RuntimeWarning: coroutine 'Event.wait' was never awaited for this line:

done, pending = await asyncio.wait(
                        [incoming, self.fini.wait()],
                        return_when=asyncio.FIRST_COMPLETED,
                    )

Tried to fix this, e.g. with asyncio.gather, but did not get it to run. With Python version 3.10 it runs without problems, but with deprecation and removal warning.

I also tried to subscribe more than 255 data points at the same time, but it didn't work. Can a subscription context manager only manage 255 COV subscriptions? If there are more than 255 subscriptions, do I need to create multiple context subscription managers?

JoelBender commented 8 months ago

With Python version 3.10 it runs without problems, but with deprecation and removal warning.

Ok, I have not tested it with anything higher than 3.10.

I also tried to subscribe more than 255 data points at the same time, but it didn't work.

Were these all from the same device or from different devices? There's nothing in the code itself that would limit it, but I suspect you are running into resource limitations of the devices you are talking to. This API does not use SubscribeCOVPropertyMultiple, I'll have to check around and see if I have any "real world" devices to test against.

There is one context manager per subscription, so you are already creating multiple context managers (and multiple "fini" events, etc). The fact that you're failing at 255 does seem suspicious!

asyncio.gather() completes when all of the tasks complete, this code is interested in also waiting for the fini event. Perhaps this could be refactored into maintaining a list of co-routines and then only one event and when the event is set then cancel all the rest (which is what asyncio.wait does with done and pending, but this would be something separate). I'm not convinced this will solve anything.

At the time that it fails, do you get back an error from the device?

aerobyte-de commented 8 months ago

Were these all from the same device or from different devices?

The subscriptions all go to the same device, yes. But the strange thing is that in Wireshark you can see that the "subscribeCOV" messages don't go out at all when I subscribe more than 255 data points.

My script reads the data points of the found devices and then subscribes to them. If I subscribe less than 255 data points, the "subscribeCOV" requests follow directly after the last "readProperty". image

If I subscribe more than 255 data points, no "subscribeCOV" requests come: image

For testing, I added an If statement to the loop: i<255 -> OK i<256 -> not OK

i = 0
        for index, row in scanned_datapoints.iterrows():
            if i<255:
                if int(row["object-type"]) in (0,2,3,5):
                    print(f'     Abonniere Datenpunkt Index: {index}, deviceip: {row["deviceip"]}, object-type: {row["object-type"]}, object-instance: {row["object-instance"]}')
                    await cmd.do_start(
                        device_address=Address(row["deviceip"]),
                        object_identifier=row["objectidentifier"],
                        property_reference=PropertyReference(row["object-name"])
                    )
                    i+=1

At the time that it fails, do you get back an error from the device?

There are nor errors in the console or in wireshark.

JoelBender commented 8 months ago

But the strange thing is that in Wireshark you can see that the "subscribeCOV" messages don't go out at all when I subscribe more than 255 data points.

Ha! The application is running out of invoke identifiers. Each confirmed service gets a unique invoke identifier in the request on the client side which is returned by the server in the confirmation. This is to line up multiple requests and their responses on the client side. Without the request going out on the wire and being confirmed, the invoke identifier is in limbo, and you are stacking up lots of requests.

The answer is to put a leaky bucket (a.k.a. token bucket) in the application that allows it to have some number of simultaneous outstanding requests per destination and as they get confirmed "drain" another request from the queue. A queue would work, then the loop that adds requests would block until there's some room.

I deliberately did not add this at the application layer, there are a number of different possible implementations and I didn't want to bake one into the code. There is also the tricky part about retry attempts and timers, in theory the client state machine shouldn't start until the request actually leaves.