FreeOpcUa / opcua-asyncio

OPC UA library for python >= 3.7
GNU Lesser General Public License v3.0
1.11k stars 358 forks source link

asyncua.sync.ThreadLoopNotRunning: could not post <coroutine object Node.read_data_value at 0x7f9ae953a7a0> #1219

Closed LostInDarkMath closed 1 year ago

LostInDarkMath commented 1 year ago

Describe the bug
I get a weird error in some scenarios an and do not have any idea where those errors come from.

To Reproduce
Steps to reproduce the behavior incl code:

File server.py:

import threading
import time
from typing import Type, Dict, Any

from asyncua.sync import Server, SyncNode
from asyncua.ua import ObjectIds, TwoByteNodeId

class OpcuaServer:
    _server: Server
    _scenario_name: str
    _idx: int
    _activity_thread: threading.Thread
    _machine_variables: dict = {}
    _activity_running: bool = True
    _shutdown_requested: bool = False

    def __init__(self, endpoint: str, scenarios: Dict[str, Type]) -> None:
        self._server = Server()
        self._server.set_endpoint(endpoint)
        self._server.set_server_name("Packaging Sealing Demo Control OpcUa Server")
        self._scenario_name = 'Running'
        self._scenarios = scenarios
        self._idx = self._server.register_namespace("http://dummyMachine.device.io")

        self._packaging_machine: SyncNode = self._server.get_node(
            TwoByteNodeId(ObjectIds.ObjectsFolder)).add_object(self._idx, "PackagingMachine")

        for name, value in self._scenarios[self._scenario_name].__dict__.items():
            self._machine_variables[name] = self.add_variable(name, value)
            self._machine_variables[name].set_writable(True)

        self._activity_node = self.add_variable('movement', 1)

    def add_variable(self, name: str, value: Any) -> SyncNode:
        return self._packaging_machine.add_variable(f"ns={self._idx};s={name}", name, value)

    def start(self):
        self._server.start()
        self._activity_thread = threading.Thread(
            target=self.machine_activity,
            daemon=True,
        )
        self._activity_thread.start()

    def stop(self) -> None:
        self._shutdown_requested = True
        self._activity_thread.join()
        self._server.stop()

    def get_node_info_for_all_nodes(self):
        return [self._machine_variables[node].read_data_value(raise_on_bad_status=False).StatusCode for node in self._machine_variables.keys()]

    def machine_activity(self):
        while not self._shutdown_requested:
            if self._activity_running:
                current_value = self._activity_node.get_value()
                self._activity_node.set_value(1 - current_value)

            time.sleep(1)

file test_server.py

from dataclasses import dataclass

import pytest

from server import OpcuaServer

URL = 'opc.tcp://127.0.0.1:34411'

@pytest.fixture()
def demo_opcua_server() -> OpcuaServer:
    @dataclass
    class Scenario:
        sensor_one: int
        sensor_twoo: float  # it works, if this name is changed to "sensor_two"

    scenarios_1 = {
        'Running': Scenario(
            sensor_one=1,
            sensor_twoo=130.0,  # also here
        ),
    }

    server = OpcuaServer(endpoint=URL, scenarios=scenarios_1)
    server.start()
    yield server
    server.stop()

def test_node_creation_from_scenarios(demo_opcua_server):
    demo_opcua_server.get_node_info_for_all_nodes()

def test_adding_a_new_node_to_config_does_not_change_node_ids():
    @dataclass
    class Scenario:
        sensor_one: int
        sensor_two: float

    scenarios_1 = {
        'Running': Scenario(
            sensor_one=1,
            sensor_two=130.0,
        ),
    }

    server_1 = OpcuaServer(endpoint=URL, scenarios=scenarios_1)
    server_1.start()
    machine_variables_1 = server_1.get_node_info_for_all_nodes()
    server_1.stop()

    @dataclass
    class Scenario2:
        sensor_new: bool
        sensor_one: int
        sensor_two: float

    scenarios_2 = {
        'Running': Scenario2(
            sensor_new=True,
            sensor_one=1,
            sensor_two=130.0,
        ),
    }

    server_2 = OpcuaServer(endpoint=URL, scenarios=scenarios_2)
    server_2.start()
    machine_variables_2 = server_2.get_node_info_for_all_nodes()
    server_2.stop()

If you execute the tests, you might notice some strange behavior:

  1. If you execute each tests alone, everything is fine.
  2. If you execute both tests at one, you will get this error:
FAILED [100%]
test_server.py:33 (test_adding_a_new_node_to_config_does_not_change_node_ids)
def test_adding_a_new_node_to_config_does_not_change_node_ids():
        @dataclass
        class Scenario:
            sensor_one: int
            sensor_two: float

        scenarios_1 = {
            'Running': Scenario(
                sensor_one=1,
                sensor_two=130.0,
            ),
        }

        server_1 = OpcuaServer(endpoint=URL, scenarios=scenarios_1)
        server_1.start()
>       machine_variables_1 = server_1.get_node_info_for_all_nodes()

test_server.py:49: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
server.py:52: in get_node_info_for_all_nodes
    return [self._machine_variables[node].read_data_value(raise_on_bad_status=False).StatusCode for node in self._machine_variables.keys()]
server.py:52: in <listcomp>
    return [self._machine_variables[node].read_data_value(raise_on_bad_status=False).StatusCode for node in self._machine_variables.keys()]
venv/lib/python3.11/site-packages/asyncua/sync.py:94: in wrapper
    result = self.tloop.post(aio_func(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ThreadLoop(Thread-1, stopped 140659069720128)>
coro = <coroutine object Node.read_data_value at 0x7fedbde8a7a0>

    def post(self, coro):
        if not self.loop or not self.loop.is_running():
>           raise ThreadLoopNotRunning(f"could not post {coro}")
E           asyncua.sync.ThreadLoopNotRunning: could not post <coroutine object Node.read_data_value at 0x7fedbde8a7a0>

venv/lib/python3.11/site-packages/asyncua/sync.py:50: ThreadLoopNotRunning
  1. If you rename sensor_twoo to sensor_two, everything works fine.

It seems like your library does have some sort of global state which creates this weird behavior. It's super strange.

Expected behavior
Two green tests, even if I execute both test at the same run.

Version
Python-Version: Python 3.11.2 opcua-asyncio Version: 1.0.1

pip freeze:

aiofiles==23.1.0
aiosqlite==0.18.0
asyncua==1.0.1
attrs==22.2.0
cffi==1.15.1
cryptography==39.0.1
iniconfig==2.0.0
packaging==23.0
pluggy==1.0.0
pycparser==2.21
pytest==7.2.1
python-dateutil==2.8.2
pytz==2022.7.1
six==1.16.0
sortedcontainers==2.4.0

FYI @saknarf @DasCapschen

oroulet commented 1 year ago

Usually that kind of messages pop up when something is closed in a bad way

schroeder- commented 1 year ago

Setting the logger to debug and posting the log or at least the end of the log, would be helpfull for diagnostics.

LostInDarkMath commented 1 year ago

@schroeder- Here is the complete with log level DEBUG:


-------------------------------- live log setup --------------------------------
DEBUG    asyncio:selector_events.py:54 Using selector: EpollSelector
DEBUG    asyncua.sync:sync.py:35 Threadloop: <_UnixSelectorEventLoop running=False closed=False debug=False>
INFO     root:internal_server.py:65 No user manager specified. Using default permissive manager instead.
INFO     asyncua.server.internal_session:internal_session.py:46 Created internal session Internal
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=11715, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15958, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15959, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15960, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15961, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15962, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15963, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15964, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16134, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16135, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16136, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
WARNING  asyncua.server.server:server.py:348 Endpoints other than open requested but private key and certificate are not set.
INFO     asyncua.server.internal_server:internal_server.py:172 starting internal server
INFO     asyncua.server.binary_server_asyncio:binary_server_asyncio.py:148 Listening on 127.0.0.1:34411
DEBUG    asyncua.server.server:server.py:447 OPC UA Server(opc.tcp://127.0.0.1:34411) server started
PASSED
------------------------------ live log teardown -------------------------------
INFO     asyncua.server.binary_server_asyncio:binary_server_asyncio.py:152 Closing asyncio socket server
INFO     asyncua.server.internal_server:internal_server.py:183 stopping internal server
INFO     asyncua.server.internal_session:internal_session.py:72 close session Internal
INFO     asyncua.server.subscription_service:subscription_service.py:59 delete subscriptions: []
DEBUG    asyncua.server.server:server.py:465 OPC UA Server(opc.tcp://127.0.0.1:34411) Internal server stopped, everything closed

test_server.py::test_adding_a_new_node_to_config_does_not_change_node_ids 
-------------------------------- live log call ---------------------------------
DEBUG    asyncio:selector_events.py:54 Using selector: EpollSelector
DEBUG    asyncua.sync:sync.py:35 Threadloop: <_UnixSelectorEventLoop running=False closed=False debug=False>
INFO     root:internal_server.py:65 No user manager specified. Using default permissive manager instead.
INFO     asyncua.server.internal_session:internal_session.py:46 Created internal session Internal
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=11715, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15958, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15959, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15960, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15961, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15962, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15963, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=15964, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16134, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16135, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
INFO     asyncua.server.address_space:address_space.py:289 add_node: while adding node NumericNodeId(Identifier=16136, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>), requested parent node NumericNodeId(Identifier=15957, NamespaceIndex=0, NodeIdType=<NodeIdType.Numeric: 2>) does not exists
WARNING  asyncua.server.server:server.py:348 Endpoints other than open requested but private key and certificate are not set.
INFO     asyncua.server.internal_server:internal_server.py:172 starting internal server
INFO     asyncua.server.binary_server_asyncio:binary_server_asyncio.py:148 Listening on 127.0.0.1:34411
DEBUG    asyncua.server.server:server.py:447 OPC UA Server(opc.tcp://127.0.0.1:34411) server started
FAILED
test_server.py:33 (test_adding_a_new_node_to_config_does_not_change_node_ids)
def test_adding_a_new_node_to_config_does_not_change_node_ids():
        @dataclass
        class Scenario:
            sensor_one: int
            sensor_two: float

        scenarios_1 = {
            'Running': Scenario(
                sensor_one=1,
                sensor_two=130.0,
            ),
        }

        server_1 = OpcuaServer(endpoint=URL, scenarios=scenarios_1)
        server_1.start()
>       machine_variables_1 = server_1.get_node_info_for_all_nodes()

test_server.py:49: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
server.py:52: in get_node_info_for_all_nodes
    return [self._machine_variables[node].read_data_value(raise_on_bad_status=False).StatusCode for node in self._machine_variables.keys()]
server.py:52: in <listcomp>
    return [self._machine_variables[node].read_data_value(raise_on_bad_status=False).StatusCode for node in self._machine_variables.keys()]
venv/lib/python3.11/site-packages/asyncua/sync.py:94: in wrapper
    result = self.tloop.post(aio_func(*args, **kwargs))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ThreadLoop(Thread-1, stopped 139819296659008)>
coro = <coroutine object Node.read_data_value at 0x7f2a3787ece0>

    def post(self, coro):
        if not self.loop or not self.loop.is_running():
>           raise ThreadLoopNotRunning(f"could not post {coro}")
E           asyncua.sync.ThreadLoopNotRunning: could not post <coroutine object Node.read_data_value at 0x7f2a3787ece0>

venv/lib/python3.11/site-packages/asyncua/sync.py:50: ThreadLoopNotRunning

========================= 1 failed, 1 passed in 5.47s ==========================

You can also execute the code on your machine, just copy paste and run :)

Usually that kind of messages pop up when something is closed in a bad way

@oroulet Can please tell my where in the code example above something is closed in a bad way? Would be really helpful :)

And why does the name of the dataclass property influence how if resources are closed in a bad or in a good way?

saknarf commented 1 year ago

It seems like starting the server used in the first test (demo_opcua_server) in the second test (although it is not used there) somehow avoids the bug.

To reproduce File server.py remains the same

File test_server.py:

from dataclasses import dataclass

import pytest

from server import OpcuaServer

URL = 'opc.tcp://127.0.0.1:34411'
URL_2 = 'opc.tcp://127.0.0.1:34412'

@pytest.fixture()
def demo_opcua_server() -> OpcuaServer:
    @dataclass
    class Scenario:
        sensor_one: int
        sensor_twoo: float  # it works, if this name is changed to "sensor_two"

    scenarios_1 = {
        'Running': Scenario(
            sensor_one=1,
            sensor_twoo=130.0,  # also here
        ),
    }

    server = OpcuaServer(endpoint=URL, scenarios=scenarios_1)
    server.start()
    yield server
    server.stop()

def test_node_creation_from_scenarios(demo_opcua_server):
    demo_opcua_server.get_node_info_for_all_nodes()

def test_adding_a_new_node_to_config_does_not_change_node_ids(demo_opcua_server):
    @dataclass
    class Scenario:
        sensor_one: int
        sensor_two: float

    scenarios_1 = {
        'Running': Scenario(
            sensor_one=1,
            sensor_two=130.0,
        ),
    }

    server_1 = OpcuaServer(endpoint=URL_2, scenarios=scenarios_1)
    server_1.start()
    machine_variables_1 = server_1.get_node_info_for_all_nodes()
    server_1.stop()

    @dataclass
    class Scenario2:
        sensor_new: bool
        sensor_one: int
        sensor_two: float

    scenarios_2 = {
        'Running': Scenario2(
            sensor_new=True,
            sensor_one=1,
            sensor_two=130.0,
        ),
    }

    server_2 = OpcuaServer(endpoint=URL_2, scenarios=scenarios_2)
    server_2.start()
    machine_variables_2 = server_2.get_node_info_for_all_nodes()
    server_2.stop()

What did I change:

Maybe that helps to locate the bug :)

schroeder- commented 1 year ago

The bug is in the code: You are using a class variable _machine_variables. This variable is valid for every OpcuaServer instance. So if you start one instance with {'abc': 1} .... every instance will have this in the machine_variables dict!

class OpcuaServer:
    _server: Server
    _scenario_name: str
    _idx: int
    _activity_thread: threading.Thread
    _machine_variables: dict = {}   <-- class variable!!!!

So you access a node of disconnected server instance!

LostInDarkMath commented 1 year ago

OMFG sorry! I've completly overlooked this. Thank you!