I have created an example about what it takes to implement a condition OffNormalAlarmType with the current state op python-opcua (master branch). Don't mind the quality of the code it is only for illustration :-)
If anyone is interested maybe this helps to discuss how the python-opcua API can be extended to make the use of alarms easier.
'''
poc-server-alarm.py
Proof of concept for implementing a Condition Alarm with python-opcua.
This POC isn't production code or an attempt to extend the functionality of python-opcua.
Just a tryout to see what it takes to create an alarm with python-opcua and investigate
where possible issues are.
To get the poc runnning it is not required to change other files in the framework
As client UaExpert is used in the Event View.
Use:
- Start UaExpert, connect to this server
- Open the Event View
- Subscribe to the event by dragging the MyObjectWithAlarm object to the Event View
configuration area
- In the interactive prompt of this server type
- Make the alaram active: wrap.setStateValue(0)
- Solve the alaram active: wrap.setStateValue(1)
Found issues:
TODO: The event fields implementation lacks support for (nested) HasComponent relations.
Currently only properties are supported.
TODO: Instead of automatic generated event fields (use of get_event_obj_from_type_node.CustomEvent)
a way to dynamic register new classes based on event_objects.BaseEvent.
TODO: In the Address space the methods of the OffNormalAlarmType are not conform the specification.
For example Acknowlegde lacks the correct method arguments (5.7.3). Due this the buildin in
alarm function of UAExpert like Acknowkledge and Refresh can't be used.
TODO: Add shorthand for creating references
TODO: Add helper for easy binding of OPC Method to a Python object
'''
from opcua import ua, uamethod, Server, common # , Event, common
from opcua.ua.uatypes import NodeId
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
class OffNormalAlarmEvent(common.event_objects.BaseEvent):
"""
OffNormalAlarmEvent implements OffNormalAlarmType from which inherit all other OffNormalAlarmType events.
"""
def __init__(self, sourcenode=None, message=None, severity=1):
super(OffNormalAlarmEvent, self).__init__(sourcenode, message, severity)
self.EventType = ua.NodeId(ua.ObjectIds.OffNormalAlarmType)
# NodeId is special one see Part 9 5.5.2 and is used for the ConditionId
# Client request this event field as qname []
self.add_property('NodeId', ua.NodeId(0, 0), ua.VariantType.NodeId)
# The client requests qname ["0:AckedState"] and ["0:AckedState"]["0:Id"], don't know to handle this
# self.add_property('AckedState', ua.LocalizedText('x'), ua.VariantType.LocalizedText)
# self.add_property('AckedState/Id', True, ua.VariantType.Boolean)
self.add_property('AckedState', True, ua.VariantType.Boolean)
# The client requests ["0:ActiveState"] and ["0:ActiveState"]["0:Id"], don't know to handle this
# self.add_property('ActiveState', ua.LocalizedText('y'), ua.VariantType.LocalizedText)
# self.add_property('ActiveState/Id', False, ua.VariantType.Boolean)
self.add_property('ActiveState', False, ua.VariantType.Boolean)
# Not required to get things working
# self.add_property('BranchId', ua.NodeId(0, 0), ua.VariantType.NodeId)
# self.add_property('ConditionClassId', ua.NodeId(0, 0), ua.VariantType.NodeId)
# self.add_property('ConditionClassName', ua.LocalizedText(''), ua.VariantType.LocalizedText)
self.add_property('ConditionName', '', ua.VariantType.String)
self.add_property('Retain', False, ua.VariantType.Boolean)
def register_event(etype, event):
# Ok IMPLEMENTED_EVENTS is ment readonly, but for a proof of concept I don't want to change other files
common.event_objects.IMPLEMENTED_EVENTS[etype] = event
register_event(ua.ObjectIds.OffNormalAlarmType, OffNormalAlarmEvent)
def create_reference(server, node, referencetype, nodeid, nodeClass=ua.NodeClass.ObjectType):
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId(referencetype)
ref.SourceNodeId = node.nodeid
ref.TargetNodeClass = nodeClass
ref.TargetNodeId = nodeid
refs.append(ref)
results = node.server.add_references(refs)
class OPCMethodBinder(object):
def bind_methods(self, node):
methods = node.get_methods()
for method in methods:
name = method.get_browse_name().Name
if hasattr(self, name) and callable(getattr(self, name)):
self.node.server.add_method_callback(method.nodeid, getattr(self, name))
class AlarmConditionWrapper(OPCMethodBinder):
ACKEDSTATES = {True: 'Acknowledged', False: 'Unacknowledged'}
ACTIVESTATES = {True: 'Active', False: 'Inactive'}
def __init__(self, server, node, state, condition):
self.server = server
self.node = node
self.state = state
self.condition = condition
self._stateValue = 1
self._enable = True
def init(self):
''' Bind the condition methods'''
self.bind_methods(self.condition)
''' Initial Condition Values'''
self.condition.get_child(["0:AckedState"]).set_value(ua.LocalizedText(AlarmConditionWrapper.ACKEDSTATES[True]))
self.condition.get_child(["0:AckedState", "0:Id"]).set_value(True)
self.condition.get_child(["0:ActiveState"]).set_value(ua.LocalizedText(AlarmConditionWrapper.ACTIVESTATES[False]))
self.condition.get_child(["0:ActiveState", "0:Id"]).set_value(False)
# self.condition.get_child(["0:ConditionClassName"]).set_value(ua.LocalizedText(ua.ObjectIdNames[ua.ObjectIds.ProcessConditionClassType]))
self.condition.get_child(["0:ConditionClassName"]).set_value(ua.LocalizedText('ProcessConditionClassType'))
self.condition.get_child(["0:ConditionClassId"]).set_value(ua.NodeId(ua.ObjectIds.ProcessConditionClassType))
self.condition.get_child(["0:ConditionName"]).set_value(self.condition.get_browse_name().Name)
self.condition.get_child(["0:EnabledState"]).set_value(ua.LocalizedText('Enabled'))
self.condition.get_child(["0:EnabledState", "0:Id"]).set_value(True)
self.condition.get_child(["0:EventType"]).set_value(ua.NodeId(ua.ObjectIds.OffNormalAlarmType))
self.condition.get_child(["0:Retain"]).set_value(False)
self.condition.get_child(["0:Severity"]).set_value(500)
self.condition.get_child(["0:SourceName"]).set_value(self.node.get_browse_name().Name)
self.condition.get_child(["0:SourceNode"]).set_value(self.node.nodeid)
self.condition.get_child(["0:SuppressedOrShelved"]).set_value(False)
create_reference(self.server, self.state, ua.ObjectIds.HasCondition, self.condition.nodeid)
create_reference(self.server, self.node, ua.ObjectIds.HasEventSource, self.state.nodeid)
''' Create event generator and init the fixed event data'''
offnormal_etype = self.server.get_node(ua.ObjectIds.OffNormalAlarmType)
self.evgen = self.server.get_event_generator(offnormal_etype, self.node)
# Property fields of OffNormalAlarmType condition event:
# NodeId aka ConditionId fixed
# AckedState
# AckedState.Id
# ActiveState
# ActiveState.Id
# ConditionName fixed
# EventId per event
# EventType fixed
# Message
# Retain
# Severity fixed
# SourceName fixed
self.evgen.event.NodeId = self.condition.nodeid
self.evgen.event.ConditionName = self.condition.get_browse_name().Name
self.evgen.event.Severity = 500
def setStateValue(self, state):
if self._stateValue != state:
self._stateValue = state
acked = state != 0
active = state == 0
retain = state == 0
self.condition.get_child(["0:AckedState"]).set_value(ua.LocalizedText(AlarmConditionWrapper.ACKEDSTATES[acked]))
self.condition.get_child(["0:AckedState", "0:Id"]).set_value(acked)
self.condition.get_child(["0:ActiveState"]).set_value(ua.LocalizedText(AlarmConditionWrapper.ACTIVESTATES[active]))
self.condition.get_child(["0:ActiveState", "0:Id"]).set_value(active)
self.condition.get_child(["0:Message"]).set_value(ua.LocalizedText('State changed to armed and dangerous'))
self.condition.get_child(["0:Retain"]).set_value(retain)
self.state.set_value(state)
self.evgen.event.AckedState = acked
self.evgen.event.ActiveState = active
self.evgen.event.Message = ua.LocalizedText("Temperature Low X")
self.evgen.event.Retain = retain
self.fire()
def fire(self):
if self._enable == True:
self.evgen.trigger()
self.condition.get_child(["0:EventId"]).set_value(self.evgen.event.EventId)
print('fired!')
# FIX: InputArguments of Acknowledge method is empty the address space ?
# FIX: InputArguments are not provided to the client
@uamethod
def Acknowledge(self, parent):
if self.evgen.event.ActiveState == True and self.evgen.event.AckedState == False:
print('Acknowledge')
self.condition.get_child(["0:AckedState"]).set_value(ua.LocalizedText(AlarmConditionWrapper.ACKEDSTATES[True]))
self.condition.get_child(["0:AckedState", "0:Id"]).set_value(True)
self.evgen.event.AckedState = True
self.fire()
@uamethod
def AddComment(self, parent):
print('AddComment')
# FIX: refresh arguments problem with namespace
@uamethod
def ConditionRefresh(self, parent):
print('ConditionRefresh')
# TODO: fire start refresh event
# TODO: if alaram is active send the event
# TODO: fire end refresh event
@uamethod
def Disable(self, parent):
print('Disable')
self._enable == False
self.condition.get_child(["0:EnabledState"]).set_value(ua.LocalizedText('Disabled'))
self.condition.get_child(["0:EnabledState", "0:Id"]).set_value(False)
@uamethod
def Enable(self, parent):
print('Enable')
self._enable == True
self.condition.get_child(["0:EnabledState"]).set_value(ua.LocalizedText('Enabled'))
self.condition.get_child(["0:EnabledState", "0:Id"]).set_value(True)
# TODO: what todo with optional Confirm method
# @uamethod
# def Confirm(self, parent):
# print('Enable')
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, "MyObjectWithAlarm")
state = myobj.add_variable(idx, 'State', 1)
cond = myobj.add_object(idx, 'StateCondition', ua.ObjectIds.OffNormalAlarmType)
cond.set_attribute(ua.AttributeIds.DisplayName, ua.DataValue(ua.LocalizedText('StateCondition')))
wrap = AlarmConditionWrapper(server, myobj, state, cond)
wrap.init()
server.start()
try:
embed()
finally:
server.stop()
I have created an example about what it takes to implement a condition OffNormalAlarmType with the current state op python-opcua (master branch). Don't mind the quality of the code it is only for illustration :-)
If anyone is interested maybe this helps to discuss how the python-opcua API can be extended to make the use of alarms easier.