FreeOpcUa / opcua-asyncio

OPC UA library for python >= 3.7
GNU Lesser General Public License v3.0
1.15k stars 368 forks source link

Future for request id {request_id} is already done #53

Closed Ilyazyk closed 4 years ago

Ilyazyk commented 5 years ago

I created a thread (from threading import Thread) for loop in order to run asyncio it with Qt. I do a lot (1000/Sec) of asyncio.run_coroutine_threadsafe(node.setvalue(), loop) . In some circumstances I got an error. May be it's something obvious for you guys what can be wrong?

ERROR:asyncua.client.ua_client.UASocketProtocol:Exception raised while parsing message from server Traceback (most recent call last): File "C:\Users\Labuser\AppData\Local\Programs\Python\Python37-32\lib\site-packages\asyncua\client\ua_client.py", line 152, in _call_callback self._callbackmap[request_id].set_result(body) asyncio.base_futures.InvalidStateError: invalid state

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "C:\Users\Labuser\AppData\Local\Programs\Python\Python37-32\lib\site-packages\asyncua\client\ua_client.py", line 79, in _process_received_data self._process_received_message(msg) File "C:\Users\Labuser\AppData\Local\Programs\Python\Python37-32\lib\site-packages\asyncua\client\ua_client.py", line 93, in _process_received_message self._call_callback(msg.request_id(), msg.body()) File "C:\Users\Labuser\AppData\Local\Programs\Python\Python37-32\lib\site-packages\asyncua\client\ua_client.py", line 158, in _call_callback raise ua.UaError(f"Future for request id {request_id} is already done") asyncua.ua.uaerrors._base.UaError: Future for request id 2458 is already done

oroulet commented 5 years ago

If you have a qt application it might make sense to use the sync api in asyncua.syn. look at open PR

oroulet commented 5 years ago

Invalid state sound like a threading error. Maybe you have two loops running?

Ilyazyk commented 5 years ago

It looks like something becomes overloaded. When I do node.setvalue(), I don't wait for result. I just drop it into the parallel loop. And, probably, there are too many. I've tested asyncua.sync version (thanks for this!), and found that the maximum throughtput is 300 nodes/sec. My 4 CPUs are loaded less then 40%. The same values I had with async version. What can be a bottleneck? How can I achive a write speed like 2000 nodes/sec (ideally is 10.000)?

oroulet commented 5 years ago

2000 seems optimistic. Opcua write is not optimized for speed and do not forget this is a network operation. What you can do is look at set_attribute code and make your own code reusing all objects. Also you can do a write and do not wait for future (if you do not do it already). But anyway I am 99% sure network IS the bottleneck

oroulet commented 5 years ago

OK performance is fun so I did some small tests on server side on my pc: I can write a value at around 100 000 values/s using server.set_attribute on client side with both server and client on my pc: I can write around 850 nodes/s. using optimized code instead of set_value, does not seem to change anything. so either the performance is the network/parsing/packing or maybe the server

import time
import uvloop
import asyncio
import sys
import logging
import cProfile

sys.path.insert(0, "..")
from asyncua import Client, ua

logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')

async def main():
    url = 'opc.tcp://localhost:4840/freeopcua/server/'
    async with Client(url=url) as client:
        uri = 'http://examples.freeopcua.github.io'
        idx = await client.get_namespace_index(uri)
        var = await client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])

        nb = 4000
        start = time.time()
        attr = ua.WriteValue()
        attr.NodeId = var.nodeid
        attr.AttributeId = ua.AttributeIds.Value
        attr.Value = ua.DataValue(ua.Variant(1.0, ua.VariantType.Float))
        params = ua.WriteParameters()
        params.NodesToWrite = [attr]
        for i in range(nb):
            params.NodesToWrite[0].Value.Value.Value = i
            result = await client.uaclient.write(params)
            #result[0].check()
            #await var.set_value(i)
    print("\n Write frequency: \n", nb / (time.time() - start))

if __name__ == '__main__':
    #uvloop.install()
    asyncio.run(main())
    #cProfile.run('asyncio.run(mymain(), debug=True)', filename="perf.cprof")
Ilyazyk commented 5 years ago

Thank you, @oroulet. It sounds like 'network/parsing/packing' has to be optimised.

Ilyazyk commented 5 years ago

I've just realized, if 850 times/sec is write speed for 1 node, it's a fantastic speed for industrial automation! This means the period is 1.2 mSec. I meant the write speed of different nodes. Can we use a power of asyncio and write 10.000 nodes in parallel? What will be the time then? Something like:

futures = [node.set_value() for node in nodes]
for future in asyncio.as_completed(futures):
     result = await future
oroulet commented 5 years ago

@Ilyazyk a method to do that is not exposed now, but it make sense when checking the result of a write operation is not necessary. (Although checking state of connection or one in Xth in reality should be done) What needs to be done is following:

Ilyazyk commented 5 years ago

Gents, I've tested the method and the result is better, but still sad: 7 sec against 11 sec fof writing 10000 nodes. Could you please check & advise what can be improved. The test code is attached: async_write_test.zip

And, as I don't know how to create Pull Request, I am placing my changes here.

node.py

    def set_value_async(self, value, varianttype=None):
        datavalue = None
        if isinstance(value, ua.DataValue):
            datavalue = value
        elif isinstance(value, ua.Variant):
            datavalue = ua.DataValue(value)
            datavalue.SourceTimestamp = datetime.utcnow()
        else:
            datavalue = ua.DataValue(ua.Variant(value, varianttype))
            datavalue.SourceTimestamp = datetime.utcnow()

        return self.set_attribute_async(ua.AttributeIds.Value, datavalue)
    def set_attribute_async(self, attributeid, datavalue):
        """
        Set an attribute of a node
        attributeid is a member of ua.AttributeIds
        datavalue is a ua.DataValue object
        """
        attr = ua.WriteValue()
        attr.NodeId = self.nodeid
        attr.AttributeId = attributeid
        attr.Value = datavalue
        params = ua.WriteParameters()
        params.NodesToWrite = [attr]
        return self.server.write_async(params)

ua_client.py class UaClient

    def write_async(self, params):
        self.logger.debug("write")
        request = ua.WriteRequest()
        request.Parameters = params
        return self.protocol.send_request_async(request)

class UASocketProtocol

    def send_request_async(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
        """
        Send a request to the server.
        Timeout is the timeout written in ua header.
        Returns response object if no callback is provided.
        """
        return self._send_request(request, timeout, message_type)
Ilyazyk commented 5 years ago

The maximum write speed I could achieve by writing different nodes was 1500 nodes/sec. It doesn't depend on number of nodes (for nodes number > 100). Can it be better?

oroulet commented 5 years ago

please learn how to make a PR it makes things easier for people to understand your work AND it is anyway very useful for anybody writing some code anyway high frquency support over ua is a bad idea, it is not designed for that. if you really want to do high frequency write, then your server probably supports things like udp or even ethercat if it is plc

Ilyazyk commented 5 years ago

@oroulet, I understand. And I am totally happy with a 1 node update period 100 mSec, which is not really high frequency (you measurement gave 80 times better number). Here I am talking about a system scalability, where an implementation may affect. It seems there is an area for improvement. Can it be because of GIL?

oroulet commented 5 years ago

we can't know anything before doing sme profiling and read the code carefully. I once optimized the code to do writes on server side, there is derfor an optimized method called Server.set_atribute_value(). I could not improve things more for writting from client. but you never know someone can come up with something. we do not use threads but asyncio in server, so GIL is not an issue, unless you reimplement things using threads

Ilyazyk commented 5 years ago

An observation. When freeopcua server is running on Ubuntu (19.04), async client can write 3000 different nodes/Sec in both cases: when it's running localy on Ubuntu or remotelly on Windows7.
When freeopcua server is running on Windows 7, the speed in not higher than 1500 nodes/Sec. In case of Rockwell server (FactoryTalk Linx Gateway) the client can't write more the 400 nodes/Sec (both async & sync versions) from both Ubuntu or Windows. According a documentation, Rockwell server can process 50.000 tags (nodes) with scan rate 500 mSec (these values are tested on a computer with a quad-core Intel Xeon E5-1607 v3 processor, 3.2 Ghz, and 8 GB of RAM).

Ilyazyk commented 5 years ago

It's becoming an off-topic, as my questions are more about performance (it was a reason why did I get the error). And, probably, the commit "memory leak: remove request_id from callback map after use in ua_client…" approached this issue as well (I will check). The question: Is a nodes group write implemented? It seems, now, as only one node can be written in time, server has to reply on each write request, and, in case of 10.000 waiting futures it takes long time. According to specification (OPC Unified Architecture Specification Part 4: Services, chapter 5.10.4.2), there is a parameter "nodesToWrite [] - List of Nodes and their Attributes to write", which promises performance increase if we can process server response for all write requests at once. Am I saying any right?

oroulet commented 5 years ago

you just use the lower level API. I am sure I wrote an example somewhere but I cannot find it. but just look at code inside set_attribute() create your own WriteRequest and send it.

Ilyazyk commented 5 years ago

Gents, My Eclipse doesn't let me to make a commit, and I am looking for your help. I've done a fix (attached). Could you commit it to 'opcua-asyncio\asyncua\client\' and make a Pul Request . Now the method set_values() is added to a client, which lets to write multiple nodes with their values in one shot. This has given me a huge performance growth, like 20 times. I am able to write up to 10.000/Sec. We can close the issue, then.

fix_53.zip

cbergmiller commented 5 years ago

First of all thanks for figuring out a way to do this highly performant operation with asyncua! I created a patch for your changes and added it below for reference. We will have to think about this before accepting a pull request. Why didn't you use the existing UaClient.write() method and keep the code you added as part of your application? This way you would have more flexibility. In my opinion your added API methods may be too specific.

Index: asyncua/client/ua_client.py
===================================================================
--- asyncua/client/ua_client.py (date 1561101722000)
+++ asyncua/client/ua_client.py (date 1561785391443)
@@ -3,6 +3,7 @@
 """
 import asyncio
 import logging
+from datetime import datetime
 from typing import Dict, List
 from asyncua import ua
 from typing import Optional
@@ -654,3 +655,25 @@
         response = struct_from_binary(ua.ReadResponse, data)
         response.ResponseHeader.ServiceResult.check()
         return response.Results
+
+    async def set_attribute(self, nodes_values, attr):
+        self.logger.info("set_attribute")
+        request = ua.WriteRequest()
+        for node, value, varianttype in nodes_values:
+            rv = ua.WriteValue()
+            rv.NodeId = node
+            rv.AttributeId = attr
+            if isinstance(value, ua.DataValue):
+                datavalue = value
+            elif isinstance(value, ua.Variant):
+                datavalue = ua.DataValue(value)
+                datavalue.SourceTimestamp = datetime.utcnow()
+            else:
+                datavalue = ua.DataValue(ua.Variant(value, varianttype))
+                datavalue.SourceTimestamp = datetime.utcnow()
+            rv.Value = datavalue
+            request.Parameters.NodesToWrite.append(rv)
+        data = await self.protocol.send_request(request)
+        response = struct_from_binary(ua.WriteResponse, data)
+        response.ResponseHeader.ServiceResult.check()
+        return response.Results
Index: asyncua/client/client.py
===================================================================
--- asyncua/client/client.py    (date 1561101722000)
+++ asyncua/client/client.py    (date 1561785029470)
@@ -585,3 +585,12 @@
         nodes = [node.nodeid for node in nodes]
         results = await self.uaclient.get_attribute(nodes, ua.AttributeIds.Value)
         return [result.Value.Value for result in results]
+
+    async def set_values(self, nodes_values):
+        """
+        Write the value of multiple nodes in one roundtrip.
+        nodes_values is a list of tuples: [(node, value),]
+        """
+        nodes_values = [(node.nodeid, value, varianttype) for node, value, varianttype in nodes_values]
+        results = await self.uaclient.set_attribute(nodes_values, ua.AttributeIds.Value)
+        return [result for result in results]
oroulet commented 5 years ago

Gents, My Eclipse doesn't let me to make a commit

?!?! How is this possible? Get a terminal and run git by hand. Learning git will help us and yourself

Ilyazyk commented 5 years ago

@cbergmiller, I had 2 reasons to don't use UaClient.write():

  1. It was not obvious how to do it with using the low level API. If you think the added method is too specific, we need an example how to do it with UaClient.write().
  2. The client has a method get_values(), which reads values for multiple nodes. Why to don't have a similar method set_values() for write?

An ability for client to do an effective group write is a usefull thing. In my application it is critical. Please, rework it in better way, I realy need the function.

@oroulet, I will do my best to starts commiting soon :)

P.S. Just realised that return [result for result in results] looks stupid.

oroulet commented 5 years ago

@Ilyazyk

1) in theory you should be able to use existing write method inside your new Client.set_values() thus not modifying uaclient. But I have not checked closely

2) just because nobody sent a PR ;-)

Ilyazyk commented 5 years ago

@cbergmiller, you are right. There is just one new method in client, no need to change the low level API:

    from datetime import datetime
    ....

    async def set_values(self, nodes_values):
        """
        Write the value of multiple nodes in one roundtrip.
        nodes_values is a list of tuples: [(node, value, varianttype),]
        """
        parameters = ua.WriteParameters()
        for node, value, varianttype in nodes_values:
            rv = ua.WriteValue()
            rv.NodeId = node.nodeid
            rv.AttributeId = ua.AttributeIds.Value
            datavalue = None
            if isinstance(value, ua.DataValue):
                datavalue = value
            elif isinstance(value, ua.Variant):
                datavalue = ua.DataValue(value)
                datavalue.SourceTimestamp = datetime.utcnow()
            else:
                datavalue = ua.DataValue(ua.Variant(value, varianttype))
                datavalue.SourceTimestamp = datetime.utcnow()
            rv.Value = datavalue
            parameters.NodesToWrite.append(rv)
        results = await self.uaclient.write(parameters)
        return results