FreeOpcUa / opcua-asyncio

OPC UA library for python >= 3.7
GNU Lesser General Public License v3.0
1.15k stars 368 forks source link

Two problems with subscription #980

Open f3lixTRS opened 2 years ago

f3lixTRS commented 2 years ago

Hello wise people,

This is my first interaction with github and asyncua, so if a make some mistakes, be patient with me.

I'm tying to make a subscription to an opcua server (in fact it works) but I found two problems along the way.

(i) Sometimes, when a change happends in the server (e.g. one sensor starts to comunicate for first time) I receive an error and the connection closes:

WARNING:asyncua.client.ua_client.UASocketProtocol:ServiceFault (BadSessionIdInvalid, diagnostics: DiagnosticInfo(SymbolicId=None, NamespaceURI=None, Locale=None, LocalizedText=None, AdditionalInfo=None, InnerStatusCode=None, InnerDiagnosticInfo=None)) from server received in response to PublishRequest ERROR:asyncua.client.client:Error while renewing session Traceback (most recent call last): File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\client.py", line 413, in _renew_channel_loop await self.open_secure_channel(renew=True) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\client.py", line 306, in open_secure_channel result = await self.uaclient.open_secure_channel(params) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 280, in open_secure_channel return await self.protocol.open_secure_channel(params) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 213, in open_secure_channel await asyncio.wait_for(self._send_request(request, message_type=ua.MessageType.SecureOpen), self.timeout) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 134, in _send_request self.transport.write(msg) AttributeError: 'NoneType' object has no attribute 'write'

(ii) When this error happends, the sesion doesn't close well, and in the server, the connection keeps (I only have a limited allowed connections).

My code is: ---------------------------------------- CODE START

import sys
sys.path.insert(0, "..")
import os
os.environ['PYOPCUA_NO_TYPO_CHECK'] = 'True'

import asyncio
import logging
from datetime import datetime
from dateutil import tz
import time

import colorama
from colorama import Fore

queue = asyncio.Queue()

from asyncua import Client, Node, ua

logging.basicConfig(level=logging.WARNING)
_logger = logging.getLogger('asyncua')

class SubscriptionHandler:
    def datachange_notification(self, node: Node, val, data):
        _logger.info('datachange_notification %r %s', node, val)

        timestamp_ns = data.monitored_item.Value.SourceTimestamp.timestamp()
        quality = data.monitored_item.Value.StatusCode

        tagname = node.nodeid.Identifier #Resultado: "DL04.NI_0073.Value"
        if (tagname !=2258 ):
            queue.put_nowait({'timestamp': timestamp_ns, 'var': tagname, 'value': val, 'statuscode': quality})
        else:
            print ("...keep alive... server UTC timestamp: " + str(val))

async def main():

    ip = "172.16.1.55"
    port = "4840"
    client = Client(url='opc.tcp://' + ip + ':' + port)
    client.session_timeout = 600000
    client.set_user('MyUSer')
    client.set_password('MyPassword')

    dataloggers_array = ["DL01", "DL02", "DL03", "DL04", "DL05", "DL06", "DL07", "DL08", "DL09", "DL10", "DL11", "DL12", "DL13", "DL14", "DL15", "DL16"]
    sensors_array =  ["LI_0007", "LI_0008", "LI_0072", "NI_0039", "NI_0040", "NI_0041", "NI_0044", "NI_0045", "NI_0046", "NI_0047", "NI_0048", "NI_0056", "NI_0057", "NI_0064", "NI_0065", "NI_0068", "NI_0070", "NI_0073", "NI_0078"]

    colorama.init()

    async with client:
        idx = await client.get_namespace_index('http://opcfoundation.org/UA/')
        _logger.info('Namespace index: %s', idx)

        var_array = []

        for x in range(0, len(dataloggers_array)):
            for y in range(0, len(sensors_array)):
                    try:
                        var_array.append(await client.nodes.root.get_child(["0:Objects", "2:" + dataloggers_array [x], "2:" + sensors_array[y], "2:Value"]))
                    except Exception as e1:
                        s1 = str(e1)
                        print ("Exception: " + Fore.YELLOW + dataloggers_array [x]+ "." + sensors_array[y] + " " + s1 + Fore.WHITE)    

        var_array.append(client.get_node(ua.ObjectIds.Server_ServerStatus_CurrentTime))

        handler = SubscriptionHandler()

        params = ua.CreateSubscriptionParameters()
        params.RequestedPublishingInterval = 500
        params.RequestedLifetimeCount = 7200
        params.RequestedMaxKeepAliveCount = 900

        today = datetime.now()  
        f = open ('C:\\Users\\user1\\workplace\\opcua_sub_all_log.csv','a')
        f.write("---SUBSCRIPTION---;---START---;---" + str(today) + "---\n")
        f.close()

        # We create a Client Subscription.
        subscription = await client.create_subscription(params, handler)
        print ("INFO: Subscription RUNNING with id:", subscription.subscription_id)

        # We subscribe to data changes for two nodes (variables).
        await subscription.subscribe_data_change(var_array)

        # do things
        while True:
            data = await queue.get()

            # Auto-detect local hour zone 
            to_zone = tz.tzlocal()

            timestamp = data.get('timestamp')
            dt_timestamp = datetime.fromtimestamp(timestamp)
            dt_timestamp_local = dt_timestamp.astimezone(to_zone)
            print ("Event time: " +  str(dt_timestamp_local))

            f = open ('C:\\Users\\user1\\workplace\\opcua_sub_all_log.csv','a')
            toSaveInFile = (str(dt_timestamp_local) + ";" + str(data.get('var')) + ";" + str(data.get('value')) + "\n")
            f.write(toSaveInFile)
            f.close()

            queue.task_done()       

if __name__ == "__main__":
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(main())
    loop.close()

---------------------------------------- CODE END

How I can treat this problem? If this problem is not avoidable, How can I close the session and the subscription in a proper way?

Thanks in advance guys.

schroeder- commented 2 years ago

Maybe you can try out #980?

starturtle commented 2 years ago

I'm confused. You are talking about the server reporting for the first time on first change. "Change" implies that you're talking about monitoring a Variable Value. "Reporting" implies that that is the mode you've set said data change subscription to. In that case, shouldn't you receive an initial update on successful subscribe? At any rate Spec part 4 indicates that reporting is disabled otherwise. This has been discussed elsewhere already (the logic referenced there can be found here. Therefore, I'd expect that the first actual change report actually triggers the second event invocation. Or, that the MonitoringMode isn't such that any events whatsoever are to be expected, in which case it may be that this creates a loophole where the session just times out? (Sounds weird, but would it be consistent with the framework's assumptions about what a subscription would do, given that it's sensible to expect the only subscription to be Reporting?)

ZuZuD commented 2 years ago

WARNING:asyncua.client.ua_client.UASocketProtocol:ServiceFault (BadSessionIdInvalid, diagnostics: DiagnosticInfo(SymbolicId=None, NamespaceURI=None, Locale=None, LocalizedText=None, AdditionalInfo=None, InnerStatusCode=None, InnerDiagnosticInfo=None)) from server received in response to PublishRequest ERROR:asyncua.client.client:Error while renewing session Traceback (most recent call last): File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\client.py", line 413, in _renew_channel_loop await self.open_secure_channel(renew=True) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\client.py", line 306, in open_secure_channel result = await self.uaclient.open_secure_channel(params) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 280, in open_secure_channel return await self.protocol.open_secure_channel(params) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 213, in open_secure_channel await asyncio.wait_for(self._send_request(request, message_type=ua.MessageType.SecureOpen), self.timeout) File "C:\Users\user1\Python\Python39\lib\site-packages\asyncua\client\ua_client.py", line 134, in _send_request self.transport.write(msg) AttributeError: 'NoneType' object has no attribute 'write'

My understanding of this stack trace is that the TCP socket on the client side is getting closed (i.e: because self.transport is now None). Therefore, the next attempt of the client to communicate with the server fails and raises an Exception (here, it happens when the client renews the secure_channel, which isn't surprising because this background call is fired frequently).

This usually happens because there's a problem with the underlying TCP connection. In order to make progress troubleshooting this, I would recommend you to start with a wireshark/tcpdump capture to find what causes the TCP socket to close unexpectedly (timeout, RESET sent by the server, etc..).