FreeOpcUa / opcua-asyncio

OPC UA library for python >= 3.7
GNU Lesser General Public License v3.0
1.12k stars 361 forks source link

Test: long running subscriptions #1072

Closed jmarshall9120 closed 1 year ago

jmarshall9120 commented 2 years ago

I've been testing for a couple days and upgrading my version of the 'asyncua' library. A lot seems to have changed and it's still very unclear from documentation and examples how a practical subscription use, in an app will work. I would suggest any practical use case would require very robust connectivity:

  1. A means of staying alive over long periods of time
  2. A means of making sure updates are not missed.

Below is an example of me testing against a live server. I'm happy to work on this and submit a PR. There are so many use case questions that come up:

  1. How to reconnect after an error.
  2. How to keep sessions consistent.
  3. How to prioritize the reconnection event.

I ran into all kinds of bugs yesterday from, exceeding max sessions to uncaught errors in other concurrent code blocks that threw errors when I tried to asycnio.sleep. I've updated to the latest version: 0.9.98 and am testing again today to see what pops.

import unittest
import asyncio
from asyncua import Client, Node, ua
import numpy as np
import config
from datetime import datetime, timedelta
from src.opcua_connection_engine import OpcuaConnectionEngine
import logging
_logger = logging.getLogger('Test')

def log_call(msg, backoff, i, start_time):
    n = backoff[i+1] if i < len(backoff) else "NO"
    _logger.warning(f'{i} d={datetime.now() - start_time} n={n} s={backoff[0]} {msg}')    
    return

def calc_backoff(h:float, g:float):
    """Calculate an array of executions time that adhere to a backoff strategy.

    Args:
        h (int): max time domain of array in hours.
        g (_type_): backoff increment in minutes.

    Returns:
        list: executions times adhereing to backoff strategy.
    """
    n = datetime.now()
    max = timedelta(hours=h) + n
    backoff = [n + timedelta(minutes=g)]
    while backoff[-1] < max:
        backoff.append(backoff[-1]+timedelta(minutes=(len(backoff)+1)*g))
    return backoff

async def update_nodes(nodes_to_read, value):
            log_call(f'Begin update', backoff, i, start_time)
            client2 = Client(config.OPCUA_URL_CODESYS)
            client2.set_user(config.OPCUA_USER_CODESYS)
            client2.set_password(config.OPCUA_PASSWORD_CODESYS)

            await client2.connect()
            log_call(f'Value to write: {value}', backoff, i, start_time)
            values = [ua.DataValue(ua.Variant(value, ua.VariantType.Boolean))] * len(nodes_to_read)
            log_call(f'Writing values', backoff, i, start_time)
            await client2.uaclient.write_attributes(
                nodeids= [n.nodeid for n in nodes_to_read],
                datavalues= values,
                attributeid= ua.AttributeIds.Value
            )
            log_call(f'Update complete', backoff, i, start_time)
            return

class TestConnectivity(unittest.IsolatedAsyncioTestCase):
    async def test_subscription(self):
        start_time = datetime.now()
        backoff = calc_backoff(2, 2)
        print([t.isoformat() for t in backoff])

        client = Client(config.OPCUA_URL_CODESYS)
        client.set_user(config.OPCUA_USER_CODESYS)
        client.set_password(config.OPCUA_PASSWORD_CODESYS)

        await client.connect()

        nodes_to_read = [
            'ns=4;s=|var|CODESYS Control Win V3 x64.Application.ioHMIControls.EStopHMI',
            'ns=4;s=|var|CODESYS Control Win V3 x64.Application.ioHMIControls.JogHMI',
            'ns=4;s=|var|CODESYS Control Win V3 x64.Application.ioHMIControls.RunHMI',
            'ns=4;s=|var|CODESYS Control Win V3 x64.Application.ioHMIControls.ThreadHMI',
        ]
        nodes_to_read = [Node(client, n) for n in nodes_to_read]
        i=0

        class SubHandler:
            async def datachange_notification(self, node: Node, val, data):
                log_call(f'result: {data}', backoff, i, start_time)
            def status_change_notification(self, status):
                log_call(f'result: {status}', backoff, i, start_time)

        subscription = await client.create_subscription(500, SubHandler())
        await subscription.subscribe_data_change(
            nodes=nodes_to_read,
            attr=ua.AttributeIds.Value, 
            queuesize=50, 
            monitoring=ua.MonitoringMode.Reporting,
        )

        await update_nodes(nodes_to_read, True)#, client)
        await asyncio.sleep(3)
        await update_nodes(nodes_to_read, False)#, client)
        await asyncio.sleep(3)
        await update_nodes(nodes_to_read, True)#, client)
        await asyncio.sleep(3)

        while datetime.now() < backoff[-1]:
            if datetime.now() > backoff[i]:
                await update_nodes(nodes_to_read, bool(i % 2))
                i += 1
            print('Sleeping')
            await asyncio.sleep(30)

        return
schroeder- commented 2 years ago

Automatic reconnecting is on my list for upcoming features.

  1. With the current state this code is your best bet to handle connection lose:
    client = Client(...)
    while 1:
        connected = False
        try:
            async with client:
                print('Connected')
                ...
                await client.create_subscription(...)
                await subscription.subscribe_data_change(...)
                while 1:
                    await asyncio.sleep(1)
                    await client.check_connection() # Throws a exception if connection is lost
        except (ConnectionError, ua.UaError):
            if connected:
                print('Connection failed')
            else:
                print('Connection Lost')
            print('Retry in 5 seconds')
            await asyncio.sleep(5)

    Maybe adding an example is a good idea, because this questions are popping up from time to time.

  2. Keeping session consistent doesn't work with most servers, because they will not allow to continue sessions. This would be a library feature see: https://reference.opcfoundation.org/v104/Core/docs/Part4/6.7/
  3. def status_change_notification(self, status): is just for logging purpose, you can't reconnect in the handler!
AndreasHeine commented 2 years ago

in case of "long" running subscriptions be aware that you might need to create a new subscription if the lifetime of the subscription is reached and gets closed by the server. that can happen if you subscribe relatively static data while having a too short publishinginterval.

one quick fix to check if that is the reason would be to subscribe the servertime "i=2258" [Server_ServerStatus_CurrentTime] with queuesize 1 so you will never ever have a keep alive

Reference: https://reference.opcfoundation.org/Core/Part4/v104/5.13.1/

Subscriptions have a lifetime counter that counts the number of consecutive publishing cycles in which there have been no Publish requests available to send a Publish response for the Subscription. Any Service call that uses the SubscriptionId or the processing of a Publish response resets the lifetime counter of this Subscription. When this counter reaches the value calculated for the lifetime of a Subscription based on the MaxKeepAliveCount parameter in the CreateSubscription Service (5.13.2), the Subscription is closed. Closing the Subscription causes its MonitoredItems to be deleted. In addition the Server shall issue a StatusChangeNotification notificationMessage with the status code Bad_Timeout. The StatusChangeNotification notificationMessage type is defined in 7.20.4.

cedricWassenaar commented 2 years ago

I am experiencing the same problem with relatively static data. After a while the subscription is suddenly dropped. I believe this should not happen because, according to the OPC UA documentation, the lifetime should be reset by continuously making publish requests. I think this is also more logical than subscribing to a node that is not relevant. Should _publish_loop in ua_client.py not solve this problem? Perhaps there is a bug related to the publish request?

cedricWassenaar commented 2 years ago

Looking at the code it seem that the problem I am experiencing is because only one publish task, thus only one publish loop, can be created per client.

gchen001 commented 1 year ago

relatively static data. After a while the subscription is suddenly dropped. I believe this should not happen because, according to the OPC UA documentation, the lifetime should be reset by continuously making publish requests. I think this is also more logica

Hi @schroeder- i tested your code snippets but i found that it throw an error, seems like the client object does not have a function of check connection, could you help me with this? thanks a lot! await client.check_connection() # Throws a exception if connection is lost AttributeError: 'Client' object has no attribute 'check_connection'

oroulet commented 1 year ago

That error means you are using an old version. Upgrade

oroulet commented 1 year ago

Also yes. Do as @schroeder- does. That is the correct way to do that kind of things. Maybe recreate the client object too. Many users are using that client for weeks so it works. If you have an issue you are doing something special somewhere

gchen001 commented 1 year ago

That error means you are using an old version. Upgrade

Thank you very much! Oroulet.

gchen001 commented 1 year ago

Also yes. Do as @schroeder- does. That is the correct way to do that kind of things. Maybe recreate the client object too. Many users are using that client for weeks so it works. If you have an issue you are doing something special somewhere

Yes i strongly recommended adding this codesnippet as an example in the master so other people didnt need to search the issues to find some possible solutions. :)

oroulet commented 1 year ago

Yes that example was nice. Will do

jmarshall9120 commented 1 year ago

@schroeder- Got back to playing with this today. I added your code snippet to my example. After about 1.5hours I get this as an error:

Begin update : 154 - d: 1:32:49.773865 - s:2022-11-06 16:57:10.751636
Error while renewing session
Traceback (most recent call last):
  File "m:\source\bandz-padline-webserver\venv\lib\site-packages\asyncua\client\client.py", line 474, in _renew_channel_loop
    await self.open_secure_channel(renew=True)
  File "m:\source\bandz-padline-webserver\venv\lib\site-packages\asyncua\client\client.py", line 339, in open_secure_channel
    result = await self.uaclient.open_secure_channel(params)
  File "m:\source\bandz-padline-webserver\venv\lib\site-packages\asyncua\client\ua_client.py", line 307, in open_secure_channel
    return await self.protocol.open_secure_channel(params)
  File "m:\source\bandz-padline-webserver\venv\lib\site-packages\asyncua\client\ua_client.py", line 222, in open_secure_channel
    raise RuntimeError('Two Open Secure Channel requests can not happen too close to each other. ' 'The response must be processed and returned before the next request can be sent.')
RuntimeError: Two Open Secure Channel requests can not happen too close to each other. The response must be processed and returned before the next request can be sent.

I'm digging in now. Any ideas are appreciated.

oroulet commented 1 year ago

I made a PR for example. and tested it by killing the server a few times. it workes. I suppose someone with more time can try to play with the different timeouts to see if that works in all cases