FreeOpcUa / python-opcua

LGPL Pure Python OPC-UA Client and Server
http://freeopcua.github.io/
GNU Lesser General Public License v3.0
1.36k stars 658 forks source link

exception in Subscription._call_datachange() stops calling BinaryClient.publish() #46

Closed jk987 closed 8 years ago

jk987 commented 9 years ago

In my client app I connect to Kepware OPC server, then I create one subscription and dozens of "monitored items". I don't know why, but some calls of subscribe_data_change() end with TimeoutError exception (in Wireshark seems everything good), but never mind. Worse is, that shortly after this error I get following exception:

Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\opcua\binary_client.py", line 331, in _call_publish_callback
    self._publishcallbacks[response.Parameters.SubscriptionId](response.Parameters)
  File "C:\Python27\lib\site-packages\opcua\subscription.py", line 61, in publish_callback
    self._call_datachange(datachange)
  File "C:\Python27\lib\site-packages\opcua\subscription.py", line 82, in _call_datachange
    data = self._monitoreditems_map[item.ClientHandle]
KeyError: 205

And after second occurence of this exception I stop getting new data change callbacks.

My amateur explanation is following: In Subscription.__init__() is twice called BinaryClient.publish() which after receiving callback calls _call_publish_callback() and this calls Subscription.publish_callback(). Exception in Subscription._call_datachange() stops correct finnishing of Subscription.publish_callback(), which means that next BinaryClient.publish() isn't called. And when this happens twice, then there is nobody sending PublishRequests.

And my amateur fix of this issue is to wrap part of code in try/except:

    def publish_callback(self, publishresult):
        self.logger.info("Publish callback called with result: %s", publishresult)
        while self.subscription_id is None:
            time.sleep(0.01)

        try:
            for notif in publishresult.NotificationMessage.NotificationData:
                if notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.DataChangeNotification_Encoding_DefaultBinary):
                    datachange = ua.DataChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
                    self._call_datachange(datachange)
                elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.EventNotificationList_Encoding_DefaultBinary):
                    eventlist = ua.EventNotificationList.from_binary(io.BytesIO(notif.to_binary()))
                    self._call_event(eventlist)
                elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.StatusChangeNotification_Encoding_DefaultBinary):
                    statuschange = ua.StatusChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
                    self._call_status(statuschange)
                else:
                    self.logger.warning("Notification type not supported yet for notification %s", notif)

            ack = ua.SubscriptionAcknowledgement()
            ack.SubscriptionId = self.subscription_id
            ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
            self.server.publish([ack])
        except:
            self.logger.exception("Exception while processing publishresult.")
            self.server.publish()

But I'm sure you will find out better fix because I'm just a programming pig.

BTW, why is publish() called twice in Subscription.__init__()?

oroulet commented 9 years ago

Please send minimal sample code showing issue and the terminal output

On Sat, Oct 24, 2015, 14:45 jk987 notifications@github.com wrote:

In my client app I connect to Kepware OPC server, then I create one subscription and dozens of "monitored items". I don't know why, but some calls of subscribe_data_change() end with TimeoutError exception (in Wireshark seems everything good), but never mind. Worse is, that shortly after this error I get following exception:

Traceback (most recent call last): File "C:\Python27\lib\site-packages\opcua\binary_client.py", line 331, in _call_publish_callback self._publishcallbacksresponse.Parameters.SubscriptionId File "C:\Python27\lib\site-packages\opcua\subscription.py", line 61, in publish_callback self._call_datachange(datachange) File "C:\Python27\lib\site-packages\opcua\subscription.py", line 82, in _call_datachange data = self._monitoreditems_map[item.ClientHandle] KeyError: 205

And after second occurence of this exception I stop getting new data change callbacks.

My amateur explanation is following: In Subscription.init() is twice called BinaryClient.publish() which after receiving callback calls _call_publish_callback() and this calls Subscription.publish_callback(). Exception in Subscription._call_datachange() stops correct finnishing of Subscription.publish_callback(), which means that next BinaryClient.publish() isn't called. And when this happens twice, then there is nobody sending PublishRequests.

And my amateur fix of this issue is to wrap part of code in try/except:

def publish_callback(self, publishresult):
    self.logger.info("Publish callback called with result: %s", publishresult)
    while self.subscription_id is None:
        time.sleep(0.01)

    try:
        for notif in publishresult.NotificationMessage.NotificationData:
            if notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.DataChangeNotification_Encoding_DefaultBinary):
                datachange = ua.DataChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
                self._call_datachange(datachange)
            elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.EventNotificationList_Encoding_DefaultBinary):
                eventlist = ua.EventNotificationList.from_binary(io.BytesIO(notif.to_binary()))
                self._call_event(eventlist)
            elif notif.TypeId == ua.FourByteNodeId(ua.ObjectIds.StatusChangeNotification_Encoding_DefaultBinary):
                statuschange = ua.StatusChangeNotification.from_binary(io.BytesIO(notif.to_binary()))
                self._call_status(statuschange)
            else:
                self.logger.warning("Notification type not supported yet for notification %s", notif)

        ack = ua.SubscriptionAcknowledgement()
        ack.SubscriptionId = self.subscription_id
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
        self.server.publish([ack])
    except:
        self.logger.exception("Exception while processing publishresult.")
        self.server.publish()

But I'm sure you will find out better fix because I'm just a programming pig.

BTW, why is publish() called twice in Subscription.init()?

— Reply to this email directly or view it on GitHub https://github.com/FreeOpcUa/python-opcua/issues/46.

jk987 commented 9 years ago

Sample code maybe later, now only the base idea:

nodes = {}
handles = {}
values = {}
result = []

for tag in many_tags:
  if tag not in nodes:
    nodes[tag] = get_node(tag)
  node = nodes[tag]
  cnt = 3
  while cnt > 0:
    try:
      if node in values:
        result.append(values[node])
        break
      else:
        if not tag in handles:
          handles[tag] = subscribe...(node)
        val = node.get_value()
        values[node] = val
        result.append(val)
        break
    except:
      cnt -= 1
  raise...

And in data change handler I have something like:

values[node] = val

By the way I'm investigating the KeyError exception (mentioned in my first post) which comes usually after TimeoutError exeption in subscribe_data_change() and it seems to me that lock is set too early in Subscription._subscribe(). The following seems to work better for me:

    def _subscribe(self, node, attr, mfilter=None, queuesize=0):
        rv = ua.ReadValueId()
        rv.NodeId = node.nodeid
        rv.AttributeId = attr
        # rv.IndexRange //We leave it null, then the entire array is returned
        mparams = ua.MonitoringParameters()
        self._client_handle += 1
        mparams.ClientHandle = self._client_handle
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
        mparams.QueueSize = queuesize
        mparams.DiscardOldest = True
        if mfilter:
            mparams.Filter = mfilter

        mir = ua.MonitoredItemCreateRequest()
        mir.ItemToMonitor = rv
        mir.MonitoringMode = ua.MonitoringMode.Reporting
        mir.RequestedParameters = mparams

        params = ua.CreateMonitoredItemsParameters()
        params.SubscriptionId = self.subscription_id
        params.ItemsToCreate.append(mir)
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither

        results = self.server.create_monitored_items(params)
        result = results[0]
        result.StatusCode.check()

        self.logger.info("subscribe waiting for lock...")
        with self._lock:
            self.logger.info("subscribe locked.")
            data = SubscriptionItemData()
            data.client_handle = mparams.ClientHandle
            data.node = node
            data.attribute = attr
            data.server_handle = result.MonitoredItemId
            data.mfilter = ua.downcast_extobject(result.FilterResult)
            self._monitoreditems_map[mparams.ClientHandle] = data
        self.logger.info("subscribe unlocked.")

        return result.MonitoredItemId

I was observing something like lock collision when subscribing new item and getting data change event at the same time. Something like I'm waiting for CreateMonitoredItemsResponse but first I got PublishResponse and I can't process it because I'm waitng for that damned CreateMonitoredItemsResponse...

oroulet commented 9 years ago

the key error happens because the createMonitoredItem operation hangs so when we receive an a notification we do not have a monitored_item created. To solve the issue we need to find out why the createMonitoredItem operation hangs.... In most cases in programming, the first exception/error is the one to look at and you can ignore the others. That is why I want more information about the first exception. What does the terminal says?

jk987 commented 9 years ago

http://ulozto.net/xa6T1ttc/test-ua-zip Password: ua1234

You can find there:

Like I said before, CreateMonitoredItem hangs because it sets the lock but there is a PublishResponse waiting for being processed but it can't be processed because Subscribtion._call_datachange can't set the lock until Subsccirption._subscribe gives up. That's the first bug.

Anyway any exception in Subscription.publish_callback (an lower) should be handled correctly otherwise there will be no following publish callbacks. That's the second bug. Due to some TCP/IP delays and comm errors it might happen that on the OPC server side the monitored item has been created but OPC client didn't get the response. And then OPC server would send the PublishResponse with this item and OPC client would get into troubles.

jk987 commented 8 years ago

I'm looking on the old issues and I've found issue #20 and fix 8725a8c which modifies BinaryClient._subscribe() exactly opposite way than I suggest. :)

oroulet commented 8 years ago

Can you try the dev branch of python-opcua and subscrive to all nodes at once? I modified the subscribe'-data_change method so you can supply a list of nodes instead of one node. I had this on my todo for a long time. Let me know if it changes something. It is not very well tested so you might find bugs....

oroulet commented 8 years ago

I think I understand what is happening in you case, you send a lot of subscribe request and overload the server. I added a timeout argument to Client. Can you try to set it to o higher value? 10 seconds for example? and let me know how it goes

jk987 commented 8 years ago

Subscribing more nodes at once is a good feature, but I'm not sure I'll be able to test it tonight.

Setting longer timeout I tried before (I could do it because Python doesn't recognize public/private).

I still think it's more like a deadlock than server overload. You can notice in my example that each node I first subscribe and then I get value of this node. After this I subscribe next node. In Wireshark capture you could see that OPC server answers quite quickly, much faster than in one second. Why does the Subscription class need to stay locked all the time while the BinaryClient is sending CreateMonitoredItemRequest and waiting for CreateMonitoredItemResponse? If you recieve during this long time PublishResponse, you can't handle it.

oroulet commented 8 years ago

we need to lock until we save monitoreditem data. otherwise we may start process notification for monitoreditems which are not in our db and crash exactly as your code does. but since you call a lot of clients at the same time we may well have a threading issue somewhere

On Mon, 26 Oct 2015 at 15:17 jk987 notifications@github.com wrote:

Subscribing more nodes at once is a good feature, but I'm not sure I'll be able to test it tonight.

Setting longer timeout I tried before (I could do it because Python doesn't recognize public/private).

I still think it's more like a deadlock than server overload. You can notice in my example that each node I first subscribe and then I get value of this node. After this I subscribe next node. In Wireshark capture you could see that OPC server answers quite quickly, much faster than in one second. Why does the Subscription class need to stay locked all the time while the BinaryClient is sending CreateMonitoredItemRequest and waiting for CreateMonitoredItemResponse? If you recieve during this long time PublishResponse, you can't handle it.

— Reply to this email directly or view it on GitHub https://github.com/FreeOpcUa/python-opcua/issues/46#issuecomment-151150246 .

jk987 commented 8 years ago

Example in this issue has only one client. I'm softly trying to make you move the lock in Subscription._subscribe() almost where it was before the commit 8725a8c. :)

Pleas look into previously attached ua_ng.log: (Be prepared there are weirdly mixed stdout (me) and stderr (opcua).)

At 2015-10-24 20:41:02,023 zero time for Wireshark (?) At 2015-10-24 20:41:03,877 (+1.854) MSG arrived (ReadResponse X007) At 2015-10-24 20:41:03,897 (+1.874) _subscribe locks (it wants to subscribe Channel1mc.Eva.IO.X010, 1st try). At 2015-10-24 20:41:03,944 (+1.921) MSG arrived (PublishResponse?) At 2015-10-24 20:41:03,961 (+1.938) _call_datachange would like to lock At 2015-10-24 20:41:04,901 (+2.878) _call_datachange can finally lock because subscription ended with TimeoutError (it is almost exactly one second after the _subscribe lock) At 2015-10-24 20:41:04,904 (+2.881) seems like some PublishRequest after _call_datachange At 2015-10-24 20:41:04,911 (+2.888) MSG arrived (CreatemonitoredItemResponse? But in wireshark it was much sooner!) At 2015-10-24 20:41:04,920 (+2.897) seems like some PublishResponse At 2015-10-24 20:41:04.931 TimeoutError of the 1st try At 2015-10-24 20:41:04,938 (+2.911) _subscribe locks (X010, 2nd try), _call_datachange would like to lock. At 2015-10-24 20:41:05.943 TimeoutError of the 2nd try At 2015-10-24 20:41:05,944 _call_datachange finally locks At 2015-10-24 20:41:05,948 1st KeyError At 2015-10-24 20:41:05,963 (+3.940) _subscribe locks (X010, 3rd try) At 2015-10-24 20:41:05,996 _call_datachange would like to lock At 2015-10-24 20:41:06.991 TimeoutError of the 3rd try At 2015-10-24 20:41:06,992 _call_datachange finally locks At 2015-10-24 20:41:06,994 2nd KeyError

If you look into "ua_ng 20151024.pcapng": at time 1.855 ReadResponse (tag X007) at time 1.877 CreateMonitoredItemsRequest (tag X010, 1st try) at time 1.918 PublishResponse (why no PublishRequest soon after this? locked?) at time 1.918 CreateMonitoredItemsResponse at time 2.101 PublishResponse (why no PublishRequest soon after this? locked?) at time 2.886 PublishRequest (why such a gap before this? because _call_datachange had to wait) at time 2.919 CreateMonitoredItemsRequest (tag X010, 2nd try) at time 2.946 CreateMonitoredItemsResponse at time 3.118 PublishResponse (the last one, no other PublishRequest follows because this ended with exception) at time 3.966 CreateMonitoredItemsRequest (tag X010, 3rd try) at time 3.992 CreateMonitoredItemsResponse at time 4.984 CreateMonitoredItemsRequest (tag X011) ... and no PublishRequest/Response at all.

I really don't think that setting longer timeouts solves this problem.

oroulet commented 8 years ago

Just implemented a solution where we lock as litle as possible. It is in dev branch. Let me know if it helps.

jk987 commented 8 years ago

Today I'll let run my app with my modifications. But I promise I'll test your modifications later. Multiple nodes subscription really interests me.

Beginner's off-topic questions: Do I have to call subscription.unsubscribe() for all nodes before calling subscription.delete(). I'm doing it, but I'm not sure it is necessary. And do I have to call subscription.delete() before calling client.disconnect()?

oroulet commented 8 years ago

No you do not need ( for both questions)

On Tue, Oct 27, 2015, 09:20 jk987 notifications@github.com wrote:

Today I'll let it run with my modifications. But I promise I'll test your modifications later. Multiple nodes subscription really interests me.

Beginner's off-topic questions: Do I have to call subscription.unsubscribe() for all nodes before calling subscription.delete(). I'm doing it, but I'm not sure it is necessary. And do I have to call subscription.delete() before calling client.disconnect()?

— Reply to this email directly or view it on GitHub https://github.com/FreeOpcUa/python-opcua/issues/46#issuecomment-151411264 .

jk987 commented 8 years ago

So I'm testing this multiple subscribe and it seems to work. I only made two modifications in your subscription.py from dev branch:

Both changes aren't necessary to be done. I only had feeling it doesn't need to be there and that without this the code is bit cleaner.

The multiple subscription is nicely fast. Thank you for this.

I even tried to use your new version the old way - to subscribe a lot of nodes one by one. And I didn't get into any deadlock. That's good.

There is only one weird thing I would like to point out: Previously I used monitored items these way:

class Master():
    def __init__(self):
        self.nodes = {}  # Cache to minimize calling client.get_node(). Key is the tag_name.
        self.values = {}  # Cache for read values. Key is the node.
        ...
        self.subscription = client.create_subscription(200, Handler(self))
        ...
        for ...
            ...
            self.nodes[tag_name] = node = client.get_node(ua.NodeId(tag_name, namespace_index))
            self.subscription.subscribe_data_change(node)

    ...

    def read(self, tag_name):
        node = self.nodes[tag_name]
        if node in self.values:  # Value in cache.
            return self.values[node]
        else:  # Not in cache yet.
            self.values[node] = val = node.get_value()
            return val

...

class Handler(object):
    def __init__(self, master):
        self.master = master

    def data_change(self, handle, node, val, attr):
        print datetime.now(), "Handler: data change", handle, node, val, attr
        self.master.values[node] = val

The trouble is that with new version of subscription.py the node in Handler.data_change() is different from node in Master.__init__(). So in the first call of Master.read() the value is obtained via node.get_value() and stored into Master.values. But it is never refreshed by Handler.data_change(). I simply solved this problem by using node.nodeid as a key for Master.values. But I think you should know about this. I'm not able to solve it another way because it's too much Python for me. (Like I said before I'm not a good programmer.)

oroulet commented 8 years ago

I commented out the second publish() in Subscription.init(). every call to publish sends a notificationRequest to server. it is often a good practice to send to request at startup so the server has a small buffer to send notification when it needs. I think all clients I tested do this. I commented out the if block with time.sleep() in Subscription._call_datachange(). I agree it is not nice, but when we create a monitored item we 1) send request, 2)wait for response, 3) parse data and save it to be able to handle notifications. The problem is that the first notification may happen before we are able to process the data. so in case we are not finished to process, we need to wait until it is done.. The trouble is that with new version of subscription.py the node in Handler.data_change() is different from node in Master.init(). Can you print the nodes you compare? I had a bug with a NodeId sent instead of Node.

jk987 commented 8 years ago

Twice PublishRequest: I understand the explanation with server's small buffer the this way:

Seems to me that in this case buffers of TCP/IP are used instead of OPC server buffers. Well, I don't mind, it only makes debugging bit more difficult. But you're right that UaExplorer is doing it this way too.

Sleep in callback: I hope that well behaved OPC server after getting CreateMonitoredItemsRequest sends CreateMonitoredItemsResponse as the first and PublishResponse as the second. And I also have a feeling that your module also processes it in this way. But again, I give up myself.

Node: When I print hex(id(node)) then it is really different. (?!) hex(id(node.nodeid)) is OK. (When i print node then is equal because it is influenced by Node.__str__().)

oroulet commented 8 years ago

the sleep method has been removed, we may miss a notification at startup in some strange conditions but it make code cleaner and safer. Do you still have an issue, can we close this?

jk987 commented 8 years ago

Hi, I'm not sure which version I'm using. Tomorrow or during weekend I'll look at it and I'll let you know.

jk987 commented 8 years ago

So, last week I compiled my app with latest freeopcua downloaded via "pip" (I guess it is 0.9.12) and since then it seems to run well at the customer.

jk987 commented 7 years ago

(Only remark: "Twice publish" discussed also in issue #264)