FreeOpcUa / python-opcua

LGPL Pure Python OPC-UA Client and Server
http://freeopcua.github.io/
GNU Lesser General Public License v3.0
1.36k stars 658 forks source link

Subscription with list of nodes fails to unsubscribe #1096

Closed lkaupp closed 5 months ago

lkaupp commented 4 years ago

Hi devs,

i got an Error thrown on disconnect after my succesfull unsubscribe. Any ideas why? i got to the root of evil think i have a problem with unsubscribe multiple nodes

 try:
            client.connect()
            client.load_type_definitions()
            nodesToSubscribe = load_node_list_from_file(path_to_nodes)
            nodesToSubscribe = [Node(client.uaclient, item) for item in nodesToSubscribe]

            handler = SubHandler()
            opcua_subscription = client.create_subscription(100, handler)
            print("subscribe")
            start = time.time()
            print(time.asctime(time.localtime(time.time())))

            opcua_handle = opcua_subscription.subscribe_data_change(nodesToSubscribe)
            end = time.time()
            print(end - start)
            print("handle")
            time.sleep(30)

            print("unsubscribe")
            opcua_subscription.unsubscribe(opcua_handle)
        finally:
             client.disconnect()
opcua_subscription.unsubscribe(opcua_handle)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\common\subscription.py", line 304, in unsubscribe
    results = self.server.delete_monitored_items(params)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 553, in delete_monitored_items
    data = self._uasocket.send_request(request)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 81, in send_request
    future = self._send_request(request, callback, timeout, message_type)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 55, in _send_request
    binreq = struct_to_binary(request)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\ua_binary.py", line 261, in struct_to_binary
    packet.append(to_binary(uatype, val))
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\ua_binary.py", line 284, in to_binary
    return struct_to_binary(val)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\ua_binary.py", line 256, in struct_to_binary
    packet.append(list_to_binary(uatype[6:], val))
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\ua_binary.py", line 294, in list_to_binary
    return dataType.pack_array(val)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\ua_binary.py", line 147, in pack_array
    return sizedata + struct.pack(self._fmt.format(len(data)), *data)
struct.error: required argument is not an integer
AndreasHeine commented 4 years ago

Stacktrace?

AndreasHeine commented 4 years ago

is type of opcua_handle = list ? each itemsub returns a handle!

if you want to sub to a list of nodes you have to loop thru them and append the returning handle to a list and if you want to unsub you need to loop thru the list and unsub the item

sorry its an asyncua example from me but... it shows how

                subscription = await client.create_subscription(50, handler)
                subscription_handle_list = []
                if nodes_to_subscribe:
                    for node in nodes_to_subscribe + variable_list: # added for performance test with 100 quick and randomly changing variables
                    # for node in nodes_to_subscribe:
                        handle = await subscription.subscribe_data_change(client.get_node(node))
                        subscription_handle_list.append(handle)
                if events_to_subscribe:
                    for event in events_to_subscribe:
                        handle = await subscription.subscribe_events(event[0], event[1])
                        subscription_handle_list.append(handle)
                print("subscribed!")
                if subscription_handle_list:
                    for handle in subscription_handle_list:
                        await subscription.unsubscribe(handle)
                await subscription.delete()
                print("unsubscribed!")
lkaupp commented 4 years ago

Well sub of a list of items is possible (function properly), why is it not possible to unsub the complete list again?

Edit:// Subscribing to each node is insane, would cause a huge amount of subscription if i want all my nodes. it is indeed possible to subscribe with one subscription to multiple nodes (works excellent). But i cannot unsubscribe , but why?

AndreasHeine commented 4 years ago

https://github.com/FreeOpcUa/python-opcua/blob/bb748c2605dada459f983f492818dbe9a2d114e0/opcua/common/subscription.py#L218 yeah it possible to pass a list to sub and if you pass in a list you should get back a list of handles!?

for unsub you could also delete the sub!

without keep-alive it should die anyways

AndreasHeine commented 4 years ago

Subscribing to each node is insane, would cause a huge amount of subscription if i want all my nodes. it is indeed possible to

you know the monitored item service set from opc ua? you create one subscription and add so called monitored items to the subscription so for the node list you only have on subscription with a bunch of nodes

lkaupp commented 4 years ago

What is meant with delete the sub and keep-alive? Well i tested before to subscribe to each single node and i got over my opcua subscription limit, with my code everything works as expected.

Edit:// if i unsubscribe to each handle in list, it tooks ages, but the problem still persists.

ERROR:concurrent.futures:exception calling callback for <Future at 0x19f59d30 state=finished returned Buffer>
Traceback (most recent call last):
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\concurrent\futures\_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 493, in _call_publish_callback
    self._uasocket.check_answer(data, "while waiting for publish response")
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 93, in check_answer
    hdr.ServiceResult.check()
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\uatypes.py", line 218, in check
    raise UaStatusCodeError(self.value)
opcua.ua.uaerrors._auto.BadSessionClosed: "The session was closed by the client."(BadSessionClosed)
ERROR:concurrent.futures:exception calling callback for <Future at 0x19f5bf58 state=finished returned Buffer>
Traceback (most recent call last):
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\concurrent\futures\_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 493, in _call_publish_callback
    self._uasocket.check_answer(data, "while waiting for publish response")
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\client\ua_client.py", line 93, in check_answer
    hdr.ServiceResult.check()
  File "C:\Users\Lukas Kaupp\AppData\Local\Programs\Python\Python38-32\lib\site-packages\opcua\ua\uatypes.py", line 218, in check
    raise UaStatusCodeError(self.value)
opcua.ua.uaerrors._auto.BadSessionClosed: "The session was closed by the client."(BadSessionClosed)

Found the mistake in the library in unsubscribe you pack the list twice:

   handles = [handle] if type(handle) is int else handle
        if not handles:
            return
        params = ua.DeleteMonitoredItemsParameters()
        params.SubscriptionId = self.subscription_id
        params.MonitoredItemIds = [handle] # second unneccessary packaging so you get [[]] list of list
AndreasHeine commented 4 years ago

how many nodes do you subscribe?

maybe have a look on the subscription class: https://github.com/FreeOpcUa/python-opcua/blob/bb748c2605dada459f983f492818dbe9a2d114e0/opcua/common/subscription.py#L74

it could also be that the unsubscribe response from server does not get back fast enougth till client disconnect!?

           opcua_subscription.unsubscribe(opcua_handle)
       finally:
            time.sleep(2)
            client.disconnect()
swamper123 commented 4 years ago

Well sub of a list of items is possible (function properly), why is it not possible to unsub the complete list again?

What Version of the repo do you have? I made a PR a week ago to make unsub a list of nodes possible.

lkaupp commented 4 years ago

@swamper123 Version: (0.98.12) @AndreasHeine i took another 30 seconds sleep in between nothing has changed. (need 5 seconds to subscribe) And i subscribe to 33k nodes at once

AndreasHeine commented 4 years ago

wireshark would help what happens and at wich time the session get closed and most important from server or client!

edit: 33k with 100ms interval is a lot of data ;)

lkaupp commented 4 years ago

@AndreasHeine you have to do what you have to do @swamper123 Which version i should use, this is the latest i could i get or should i really use the master for this?

AndreasHeine commented 4 years ago

@AndreasHeine you have to do what you have to do

guess there is a general misunderstanding...

swamper123 commented 4 years ago

@lkaupp your version contains my PR already and you even found a bug I haven't noticed. °shame on me° Will fix that in time. But 33k Nodes in 100ms with a Python OPC server is a big "ufff". This sounds pretty heavy, if not even unpossible at the moment I would say.

lkaupp commented 4 years ago

Well as i said your lib works very well (if you know all the undocumented stuff...). Its a shame that you cannot read a DataValues Variant Type. Only solution (actually implemented) is to ask the server for nodes variant type. Well also did a quick fix so i am able to retrieve the variant type without the extra server call... Nevertheless, i looked into to the unsubscribe-method and the return value of unsubscribe say [33k times] Status Good successfully unsubscribed (server response). But the error in the client when disconnect still persists, that makes no sense. Maybe it has to do with the unpractical call at the end of the unsubscribe function:

 with self._lock:
            for k, v in self._monitoreditems_map.items():
                    if v.server_handle == handle: # compares a full array of status good with a server_handle, i am not that deep into python but here could be a mistake
                        del(self._monitoreditems_map[k])
                        return
AndreasHeine commented 4 years ago

we are still developing and its still a lot to do the focus atm is on the asyncio version of python opcua -> https://github.com/FreeOpcUa/opcua-asyncio

python-opcua (https://github.com/FreeOpcUa/python-opcua) is in maintenance mode new features will be first avalible in asyncua!

feel free to give feedback for improvements

AndreasHeine commented 4 years ago

v.server_handle == handle

compares all items in the lists => v.server_handle[0] == handle[0], v.server_handle[1] == handle[1] ...

lkaupp commented 4 years ago

i would say thats not true, because my debugger would not enter it a single time. Maybe the comparison is wrong idk.

AndreasHeine commented 4 years ago
list1 = [1,2,3,4,5]
list2 = [1,2,3,4,6] #<---

if list1 == list2:
    print("==")
else:
    print("!=")

list1 = [1,2,3,4,5]
list2 = [1,2,3,4,5]

if list1 == list2:
    print("==")
else:
    print("!=")
C:\Users\andre>python c:/Users/andre/Desktop/test.py
!=
==

C:\Users\andre>
lkaupp commented 4 years ago

The first thing is an element and you compare it with an array.... in this case you do not compare two arrays Edit:// Nevertheless my debugger do not enter the if once. So basically the monitoredItems will never be deleted.

AndreasHeine commented 4 years ago

could you call: opcua_subscription.delete() before disconnect?

lkaupp commented 4 years ago

ok, interesting as far as i know you do a delete on disconnect but i will give it a try. My solution for the monitoredItem problem:

        with self._lock:
            monitored = set([v.server_handle for i, v in self._monitoreditems_map.items()])
            handles_set = set(handles)
            intersection = handles_set.intersection(monitored)
            if len(intersection) > 0:
                self._monitoreditems_map = list(monitored - handles_set)

Edit:// @AndreasHeine ty for your genius advise with sub.delete() no problem encountered so far!

AndreasHeine commented 4 years ago

with the removed brackets mentioned from @swamper123 (https://github.com/FreeOpcUa/python-opcua/pull/1097/files) and removed the "try/finaly" opcua-0.98.12

from opcua import Client, ua, Node
import time

class SubHandler(object):

    """
    Subscription Handler. To receive events from server for a subscription
    data_change and event methods are called directly from receiving thread.
    Do not do expensive, slow or network operation there. Create another 
    thread if you need to do such a thing
    """

    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)

    def event_notification(self, event):
        print("Python: New event", event)

client = Client("opc.tcp://127.0.0.1:4840")

client.connect()
client.load_type_definitions()
nodes = [Node(client, "ns=0;i=2267"), Node(client, "ns=0;i=2260"), Node(client, "ns=0;i=2258"), Node(client, "ns=0;i=2992")]

handler = SubHandler()
opcua_subscription = client.create_subscription(100, handler)
print("subscribe")
start = time.time()
print(time.asctime(time.localtime(time.time())))

opcua_handle = opcua_subscription.subscribe_data_change(nodes)
end = time.time()
print(end - start)
print("handle")
time.sleep(10)

print("unsubscribe")
opcua_subscription.unsubscribe(opcua_handle)

client.disconnect()
C:\Users\andre>python c:/Users/andre/Desktop/test.py
subscribe
Thu Jul  9 15:30:06 2020
0.001978158950805664
handle
Python: New data change event i=2267 250
Python: New data change event i=2260 BuildInfo(ProductUri:{self.ProductUri}, ManufacturerName:{self.ManufacturerName}, ProductName:{self.ProductName}, SoftwareVersion:{self.SoftwareVersion}, BuildNumber:{self.BuildNumber}, BuildDate:{self.BuildDate})
Python: New data change event i=2258 2020-07-09 13:30:05.978124
Python: New data change event i=2992 None
Python: New data change event i=2258 2020-07-09 13:30:06.973201
Python: New data change event i=2258 2020-07-09 13:30:07.976931
Python: New data change event i=2258 2020-07-09 13:30:08.974688
Python: New data change event i=2258 2020-07-09 13:30:09.974666
Python: New data change event i=2258 2020-07-09 13:30:10.979563
Python: New data change event i=2258 2020-07-09 13:30:11.981204
Python: New data change event i=2258 2020-07-09 13:30:12.983751
Python: New data change event i=2258 2020-07-09 13:30:13.995217
Python: New data change event i=2258 2020-07-09 13:30:14.994345
Python: New data change event i=2258 2020-07-09 13:30:15.999066
unsubscribe

C:\Users\andre>

Edit: for client applications i would recommend you asyncua, it has better performance!

lkaupp commented 4 years ago

@AndreasHeine will take a look into it. What i need is performance :>. Anyone who has contributed to this library has my gratitude!