FreeOpcUa / python-opcua

LGPL Pure Python OPC-UA Client and Server
http://freeopcua.github.io/
GNU Lesser General Public License v3.0
1.37k stars 662 forks source link

opc ua client: get value very slow #604

Closed badsmoke closed 5 years ago

badsmoke commented 6 years ago

Hey guys,

I am currently building a small program with which I use the opcua client to retrieve data from a controller and write it to a database.

The data retrieval is very slow, can you please tell me where my error is? The entire For grinding process takes almost 2 seconds.

thank you very much

`import sys
sys.path.insert(0, "..")

#import opcua und mysql libs
from opcua import Client
from opcua import ua
from opcua import *
import mysql.connector
#import Time libs
import datetime
import time
#import multiprocesses
import multiprocessing
from threading import Timer

import logging
logging.basicConfig()
#logging.basicConfig(level=logging.DEBUG)

#def worker():
#    """worker function"""
#    print('Worker')

def sec():
    """worker function"""
    sekunde = 0

    clientSec = Client("opc.tcp://"+login+ip+":"+port,timeout=10)
    try:
        clientSec.connect()
        connectionSec = mysql.connector.connect(host=host,database=db,port=dbPort,user=userdb,password=pwdb)
        cursorSec = connectionSec.cursor()
        # Get UA nodes in root
        root = clientSec.get_root_node()
        print("Client Sec connected")

        while 1:
            #time.sleep(1)
            t=time.time()
            while 1:
                if time.time()-t > 1:
                    print(time.time())
                    print('sec')
                    sekunde = sekunde +1
                    print(sekunde)
                    for P in range(PWU):
                        for S in range(STR):
                            for M in range(MDL):

                                var = clientSec.get_node("ns=6;s=::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"].iVoltTotal")
                                #Time = var.get_data_value().SourceTimestamp
                                iVoltTotal = round(var.get_value(),6)

                                var = clientSec.get_node("ns=6;s=::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"].iCurrTotal")
                                #Time = var.get_data_value().SourceTimestamp
                                iCurrTotal = round(var.get_value(),6)

                                var = clientSec.get_node("ns=6;s=::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"].iSoc")
                                #Time = var.get_data_value().SourceTimestamp
                                iSoc = round(var.get_value(),3)

                                Datum_Zeit = datetime.datetime.now()

                                #cursorSec = connectionSec.cursor()
                                cursorSec.execute("INSERT INTO Pwu"+str(P)+"Strang"+str(S)+"Modul"+str(M)+" (Zeit,iVoltTotal,iCurrTotal,iSoc) VALUES (%s,%s,%s,%s)",(Datum_Zeit,iVoltTotal,iCurrTotal,iSoc))
                                #cursorSec.close()
                                connectionSec.commit()
                    t=time.time()                
    finally:
        clientSec.disconnect()
        connectionSec.close()

def min1():
    """worker function"""
    while 1:
        time.sleep(60)
        print('min1')

def min5():
    """worker function"""

    while 1:
        time.sleep(60*5)
        print('min5')

def sub():
    """worker function"""

    while 1:
        time.sleep(60)
        print('sub')

#Hauptframework
if __name__ == '__main__':
    jobs = []
    x=0
   #zugangsdaten

    #OpcUA
    ip      =   "10.96.7.10"
    port    =   "4840"
    login   =   "*****:****@"
    #MariaDB
    host    =   '10.96.7.245'
    db      =   'OpcUA'
    dbPort  =   '3307'
    userdb  =   '*****'
    pwdb    =   '*****'

    #Parameter
    PWU = 1   
    STR = 7
    MDL = 1
    #Variablen erstellen
    P = 0
    S = 0
    M = 0
    #maridB connect
    connection = mysql.connector.connect(host=host,database=db,port=dbPort,user=userdb,password=pwdb)
    print("Tabellen erzeugen")
    for P in range(PWU):
        for S in range (STR):
            for M in range (MDL):
                cursor = connection.cursor()
                cursor.execute("CREATE TABLE IF NOT EXISTS Pwu"+str(P)+"Strang"+str(S)+"Modul"+str(M)+"(id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,Zeit varchar(23),iTempMin0 float(4),iTempMin1 float(4),iTempMax0 float(4),iTempMax1 float(4),iTempCellAvg float(4),iVoltTotal float(6),iCurrTotal float(6),iSoc int(3))")
                #Alle Tabellen löschen:
                #cursor.execute("DROP TABLE Pwu"+str(P)+"Strang"+str(S)+"Modul"+str(M))
                cursor.close()
                print(P,S,M)

    #OpcUA Verbindung aufbauen
    client = Client("opc.tcp://"+login+ip+":"+port,timeout=10)
    #client = Client("opc.tcp://admin@192.168.2.1:4840") #connect using a user
    try:
        client.connect()

        # Get UA nodes in root
        root = client.get_root_node()

        print("client connected")
        OPCtrue = client.get_node("ns=6;s=::AsGlobalPV:OPCtrue")

        #öffnen der einzelnen Prozesse
        for x in range(1):
            #Sekundne Prozess
            secjob = multiprocessing.Process(target=sec)
            jobs.append(secjob)
            secjob.start()
            #1 Minuten Prozess
            min1job = multiprocessing.Process(target=min1)
            jobs.append(min1job)
            min1job.start()
            #5 min Prozess
            min5job = multiprocessing.Process(target=min5)
            jobs.append(min5job)
            min5job.start()
            #Substription Prozess
            subjob = multiprocessing.Process(target=sub)
            jobs.append(subjob)
            subjob.start()
    finally:
        client.disconnect()
        connection.close()
        print("ENDE")    
`
zerox1212 commented 6 years ago

Your code looks very inefficient. Why are you getting the node object every second? You can get that once at the start and save it. Then you are doing get_data_value and get_value when the DataValue object already has the value.

Also you are creating a DB cursor every second which you don't need to do. You can create the cursor once and keep reusing it.

You might want to change your design to use subscriptions instead. Then python-opcua will keep the data you want to record in SQL up to date and all you need to trigger every second is an INSERT of the values.

I really don't understand your code. For example you create two clients for some reason, I suppose because you are using multiprocessing, but I also don't see a reason to use multiprocessing here either...

badsmoke commented 6 years ago

Thanks for your feedback.

I have to say it's my first python program and I haven't had much to do with OpcUA yet.

So you mean that get node takes that long?

So to assign the nodes to variables at the beginning and then only get the data via "get.value"?

the DB cursor is only created once. The other one was commented out?

Is it so efficient to subscribe to all data? (number of data points see below)

This is only a small example, later about 60t variables, in different intervals(sec,10sec, min 5minn subscriptions....) should be fetched. Right from the start I wanted to run everything in different threads or processes to distribute the load

Thank you very much

EDIT: I've tested that now: get_node(...) is now only executed once at the beginning, in the for loop only "get_value" is used now.

No improvement, it still takes about 2 seconds

zerox1212 commented 6 years ago

You are certain get_value is what is taking a long time? What server is your client connected to and is it on a fast network?

Why doesn't your server record this data? Generally OPC UA servers should support history.

As a test you should write a smaller program with only one client recording every second. Remove all the multiprocessing stuff. I don't know if anyone has used python-opcua with multiprocessing. Perhaps there is a problem that makes it slow.

badsmoke commented 6 years ago

I don't know why it takes so long, this is a local network, a mixture of 100mbit and 1gbit. Ping duration approx. 1ms.

A B&R controller (PLC) serves as an OpcUA server. Unfortunately, this does not have the possibility to write to a database.

I have now written a minimal client. only one data point: it takes about 10ms Is that liniar? as I wrote before, I have to fetch several thousand data points later.

zerox1212 commented 6 years ago

I don't know if that is typical, but it will likely be linear. if you call get_value 1000 times in a loop it will takes 1000x10ms or 10 seconds. How are you timing it? Did you use the timeit library?

OPC UA is a heavy protocol so requesting single values from individual nodes will probably not have great performance. If your server can put all your data into a single array node you only have to call get_value once. This would be much faster.

oroulet commented 6 years ago

It is also possible to read many nodes at once. But it looks like you have many things to improve before that

uprightcarrion commented 6 years ago

@badsmoke I did a project a few months ago collecting quite a bit of data from a Bosch controller and I used subscriptions and pushed it to a sql db. It worked ok for my needs. I'm far from experienced so it is not the best but it might help you. I think if I get back to the project I will look to see if the controller supports history so maybe if your controller supports it that is the best option.

https://github.com/tdesroches/Dataminer

uprightcarrion commented 6 years ago

Also I used the regex to get my data from the subscription which is probably not the best way. @zerox1212 helped me in thread #502 to make a call back for the datavalues but aside from writing a little test program with the subscription callback I haven't used it yet.

badsmoke commented 6 years ago

@zerox1212

my timing looks like this right now:

t=time.time()
            while 1:
                if time.time()-t > 1:
                         .
                         .....
                t=time.time()

is there a better way?

@oroulet

Which possibilities are there to fetch many nodes at once? I can't change the variable structure that is given: ::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"]. "Variable name".

The number of P, S, M changes per project. A typical number is e.g: P = 5 S = 16 M = 2 -> with approx. 50 data points each

Therefore the idea with multiprocessing, perhaps a single process per "P"

@uprightcarrion

What number is your project about? The problem with me is that there are relatively many data points that may take seconds and no longer to be fetched. unfortunately my controller will never support history of data points.

Thanks to all of you for your support

uprightcarrion commented 6 years ago

I had roughly 50-60 variables in my subscription at around 100ms interval.

zerox1212 commented 6 years ago

Like I said before. Use subscriptions, that way your data points are being updated constantly while your main thread is sleeping for the one second. Right now your 1 second sleep is doing nothing, then right after that you try to read everything from OPC UA and copy it to a DB. If you really want performance you should change your design.

If you want to read a batch of nodes at one time you have write a custom OPC UA call. This library focuses on having easy to use methods only at the node level. To make a custom call you need to understand the OPC UA specification very well. I recommend using subscription first.

badsmoke commented 6 years ago

ok now I tried to get some variables by subscription. It's definitely faster.

But can you give me a hand on how I can use that in my structure. As already mentioned, the arrangement of the data points is predefined

class SubHandler(object):

    def datachange_notification(self, node, val, data):
          print("Python: New data change event", node, val)

for P in range(PWU):
          for S in range(STR):
                    forr M in range(MDL):
                                iVoltTotalNode[P][S][M] = clientSec.get_node("ns=6;s=::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"].iVoltTotal")
.
.
.
.
---------------------------
.
.
.

for P in range(PWU):
          for S in range(STR):
                    forr M in range(MDL):
                         cursorSec.execute("INSERT INTO Pwu"+str(P)+"Strang"+str(S)+"Modul"+str(M)+" (Zeit,prcVoltTotal,prcCurrTotal,prcSoc) VALUES (%s,%s,%s,%s)",(Datum_Zeit,prcVoltTotal[P][S][M],prcCurrTotal[P][S][M],prcSoc[P][S][M]))

how can i pull out and write away all variables in the subhandler in a for loop

zerox1212 commented 6 years ago

Do something like this.

class SubHandler(object):
    """
    Subscription Handler. To receive events from server for a subscription.
    The handler forwards updates to it's referenced python object
    """

    def __init__(self, obj):
        self.obj = obj

    def datachange_notification(self, node, val, data):
        # print("Python: New data change event", node, val, data)
        self.obj.value = data.monitored_item.Value.Value.Value #this sets the value in your python object

class IVoltValue(object):

    def __init__(self, opcua_client, ua_node, P_arg, S_arg, M_arg):
        self.ua_node = ua_node
        self.P =  P_arg
        self.S =  S_arg
        self.M  = M_arg
        self.value = 0

        # subscribe to ua node of this python object (add 'self' if you want to keep track of these objects)
        handler = SubHandler(self)
        sub = opcua_server.create_subscription(500, handler)
        handle = sub.subscribe_data_change(self.ua_node)

    def insert_sql(self, pass, in, other, vars):  # pass in your prcSoc and other stuff
        cursorSec.execute("INSERT INTO Pwu"+str(self.P)+"Strang"+str(self.S)+"Modul"+str(self.M)+" (Zeit,prcVoltTotal,prcCurrTotal,prcSoc) VALUES (%s,%s,%s,%s)",(Datum_Zeit,prcVoltTotal[P][S][M],prcCurrTotal[P][S][M],prcSoc[P][S][M]))

my_list = []

for P in range(PWU):
          for S in range(STR):
                    for M in range(MDL):
                        my_node = clientSec.get_node("ns=6;s=::AsGlobalPV:BCS.PWU["+str(P)+"].STR["+str(S)+"].MDL["+str(M)+"].iVoltTotal")  
                       my_list.append(IVoltValue(clientSec, my_node, P, S, M))

while True:
    for i in my_list:
        i.insert_sql(prcSoc, date_whatever)
    time.sleep(1)

I didn't test this, you have to make it work.

burkovae commented 6 years ago

@oroulet you say there is a way to get values of many node at once. Can you provide a snippet please?

oroulet commented 6 years ago

look at the code of get_value() it uses get_attribute. copy that code and modify it to append many ReadValue instead of only one https://github.com/FreeOpcUa/python-opcua/blob/251ed63dd381bfbb903ac5b15fd4bcd6ccc0175e/opcua/common/node.py#L256

burkovae commented 6 years ago

@oroulet thank you for the very fast reply!

burkovae commented 6 years ago

@oroulet thank you again for the hint. I got this working for myself

def get_data_values(nodes, server = None):
    """Read values from multiple nodes.

    Returns
    -------
    list
        DataValue(s) for each node

    """
    if server is None:
        server = nodes[0].server
    params = ua.ReadParameters()
    for n in nodes:
        rv = ua.ReadValueId()
        rv.NodeId = n.nodeid
        rv.AttributeId = ua.AttributeIds.Value
        params.NodesToRead.append(rv)

    results = server.read(params)
    return results

def retrieve_values(data_values):
    return [d.Value.Value for d in data_values]

And i got it 33% faster: image

Before that I needed 1-0.9 seconds. Is there still any copceptual improvement that can be made?

oroulet commented 6 years ago

Use subscription. Subscription sends much smaller packets over network and does not require query

On Mon, Jun 18, 2018, 16:33 Andre B. notifications@github.com wrote:

@oroulet https://github.com/oroulet thank you again for the hint. I got this working for myself

def get_data_values(nodes, server = None): """Read values from multiple nodes.

Returns
-------
list
    DataValue(s) for each node

"""
if server is None:
    server = nodes[0].server
params = ua.ReadParameters()
for n in nodes:
    rv = ua.ReadValueId()
    rv.NodeId = n.nodeid
    rv.AttributeId = ua.AttributeIds.Value
    params.NodesToRead.append(rv)

results = server.read(params)
return results

def retrieve_values(data_values): return [d.Value.Value for d in data_values]

And i got it 33% faster: [image: image] https://user-images.githubusercontent.com/15078122/41542351-0921cb10-7315-11e8-9072-1e40b44d9a20.png

Before that I needed 1-0.9 seconds. Is there still any copceptual improvement that can be made?

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/FreeOpcUa/python-opcua/issues/604#issuecomment-398075065, or mute the thread https://github.com/notifications/unsubscribe-auth/ACcfzjd0vfTyI0sFfF758HKE2jBlntjyks5t97okgaJpZM4UR3Jx .

burkovae commented 6 years ago

That is nice, thank you @oroulet . Using subscriptions indeed yields very fast results.

pinkynrg commented 5 years ago

@burkovae would you mind sharing some code example using subscriptions?

burkovae commented 5 years ago

Something along the lines... @pinkynrg

#!python
from opcua import Client
import opcua.common as oc
import sys
import time

client = Client("opc.tcp://<ip>:<port>")

root = client.get_root_node()

data_nodes = [["0:Objects","...","..."], ["0:Objects","...","..."], ...] 

class ExampleSub(oc.subscription.SubHandler):

    def __init__(self):
        self.history = dict()

    def datachange_notification(self, node, val, data):
        id = node.nodeid.Identifier
        sensor_history = self.history.get(id)
        if sensor_history is None:
            sensor_history = list()
            self.history.update({id: sensor_history})
        sensor_history.append((val, data, time.localtime()))

example_sub = ExampleSub()
subscirption = client.create_subscription(10, example_sub)
subscirption.subscribe_data_change(data_nodes)

if __name__ == '__main__':
    cond = True
    try:
        while cond:
            current_values = [(k, v[-1]) for k, v in example_sub.history.items()]
        <print current values>
    finally:
        client.disconnect()
nmgeek commented 5 years ago

I find both of the multi-threaded subscription examples above troubling. The first one has a potential 1 second delay receiving the data from the subscription in the main loop and the second probably burns CPU without a delay. AND both have two threads accessing non-atomic data simultaneously. Certainly you can tune the delay for some nice tradeoff between delay and CPU use. However, I don't understand Python threading enough to know if it is safe to read a dictionary from one thread while another is simultaneously writing to it.

The test cases in this repository seem to offer a cleaner solution. They create concurrent.futures.Future objects and pass the values through the Future. In that case there is zero delay and you don't burn much, if any, CPU while waiting. And the data is safely passed from one thread to the other.

However, the documentation on concurrent.futures.Future says, "Future instances are created by Executor.submit() and should not be created directly except for testing." And the Executor class is intended for running some function as a separate thread so is not compatible with the subscription callback model of these OPC UA client examples.

This leaves me wanting an example without unnecessary delays and that is thread safe. Where should I look for such an example?

oroulet commented 5 years ago

@nmgeek

oroulet commented 5 years ago

for reference, there is now a Client.get_value and Client.set_values methods to read and write on many nodes at onces

pinkynrg commented 5 years ago

thanks for letting us know

yenicelik commented 4 years ago

Sorry for pinging this, but does the get_values function also exist for history-read operations?