vogler75 / automation-gateway

A OPC UA gateway which gives you access to your OPC UA values via MQTT or GraphQL (HTTP). If you have an OPC UA server in your PLC, or a SCADA system with an OPC UA server, you can query data from there via MQTT and GraphQL (HTTP). In addition, the gateway can also log value changes from OPC UA nodes in an InfluxDB, IoTDB, Kafka, and others.
GNU General Public License v3.0
222 stars 34 forks source link

OPCUA startup subscribe #10

Closed manprinsen closed 1 month ago

manprinsen commented 1 month ago

Hi :)

I have made some more testing and i em experiencing subscription issue on startup. I have approximately 1000 tags and i get same error Bad_TooManyOperations as in issue 8.

[2024-07-08 23:57:56][FINEST ][OpcUaDriver/milo              ] Find ns=4;s=|var|CODESYS Control for Linux SL.Application.IoConfig_Globals.pIoConfigTaskMap | + (1) | ns=4;s=|var|CODESYS Control for Linux SL.Application/GlobalVars/IoConfig_Globals/pIoConfigTaskMap
[2024-07-08 23:57:56][FINEST ][OpcUaDriver/milo              ] - Nodes:
[2024-07-08 23:57:56][FINEST ][OpcUaDriver/milo              ] - Found:
[2024-07-08 23:57:56][WARNING][OpcUaDriver/milo              ] Browsing childs took long time: [ns=4;s=|var|CODESYS Control for Linux SL.Application/+] took [2.1039807]s
[2024-07-08 23:57:56][FINE   ][OpcUaDriver/milo              ] Browse path result size [1055]
[2024-07-08 23:57:57][FINE   ][OpcUaDriver/milo              ] Subscribe nodes [1055] sampling interval [100.0]
[2024-07-08 23:57:57][WARNING][OpcUaDriver/milo              ] Service Fault: ServiceFault(responseHeader=ResponseHeader(timestamp=DateTime{utcTime=133649494758210000, javaDate=Mon Jul 08 23:57:55 CEST 2024}, requestHandle=1090, serviceResult=StatusCode{name=Bad_TooManyOperations, value=0x80100000, quality=bad}, serviceDiagnostics=null, stringTable=null, additionalHeader=ExtensionObject{encoded=ByteString{bytes=null}, encodingId=NodeId{ns=0, id=0}})) 
<============-> 98% EXECUTING [1m 53s]

I made some troubleshooting and came up with this code in automation-gateway\source\lib-core\src\main\kotlin\at\rocworks\gateway\core\opcua\OpcUaDriver.kt. After changing to this code the application works as expected :)

private fun subscribeNodes(topics: List<Topic>): Future<Boolean> {
    val ret = Promise.promise<Boolean>()
    if (topics.isEmpty()) {
        ret.complete(true)
    } else {
        logger.fine { "Subscribe nodes [${topics.size}] sampling interval [${monitoringParametersSamplingInterval}]" }

        val nodeIds = topics.map { NodeId.parseOrNull(it.topicNode) }.toList()

        // Check if nodeIds contain null values
        val invalidNodeIds = nodeIds.filter { it == null }
        if (invalidNodeIds.isNotEmpty()) {
            logger.severe("One or more nodeIds could not be parsed from topics: ${topics.map { it.topicNode }}")
            ret.fail("Failed to parse one or more nodeIds")
            return ret.future()
        }

        val dataChangeFilter = ExtensionObject.encode(client!!.staticSerializationContext, DataChangeFilter(
            dataChangeTrigger,
            uint(DeadbandType.None.value),
            0.0
        ))

        val requests = nodeIds.map { nodeId ->
            MonitoredItemCreateRequest(
                ReadValueId(nodeId!!, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE),
                MonitoringMode.Reporting,
                MonitoringParameters(
                    subscription!!.nextClientHandle(),
                    monitoringParametersSamplingInterval,
                    dataChangeFilter,
                    monitoringParametersBufferSize,
                    monitoringParametersDiscardOldest
                )
            )
        }

        // Batch size based on server capabilities or testing
        val batchSize = 100 // Example batch size, adjust as necessary
        val batches = requests.chunked(batchSize)

        // Process batches sequentially
        processBatches(batches, topics, 0, ret)

    }
    return ret.future()
}

private fun processBatches(batches: List<List<MonitoredItemCreateRequest>>, topics: List<Topic>, batchIndex: Int, ret: Promise<Boolean>) {
    if (batchIndex >= batches.size) {
        ret.complete(true)
        return
    }

    val batch = batches[batchIndex]
    val onItemCreated = UaSubscription.ItemCreationCallback { item: UaMonitoredItem, nr: Int ->
        val topic = topics[nr + batchIndex * batch.size]
        if (item.statusCode.isGood) {
            registry.addMonitoredItem(OpcUaMonitoredItem(item), topic)
            logger.finest { "Monitored item created for topic: ${topic.topicName}" }
        } else {
            logger.warning("Failed to create monitored item for topic: ${topic.topicName} (status=${item.statusCode})")
        }
        item.setValueConsumer { data: DataValue ->
            valueConsumer(topic, data)
        }
    }

    subscription!!.createMonitoredItems(TimestampsToReturn.Both, batch, onItemCreated)
        .thenAccept { monitoredItems: List<UaMonitoredItem> ->
            try {
                for (item in monitoredItems) {
                    if (item.statusCode.isGood) {
                        logger.finest { "Monitored item created for nodeId ${item.readValueId.nodeId}" }
                    } else {
                        logger.warning("Failed to create item for nodeId ${item.readValueId.nodeId} (status=${item.statusCode})")
                    }
                }
                // Recursively process the next batch
                processBatches(batches, topics, batchIndex + 1, ret)
            } catch (e: Exception) {
                e.printStackTrace()
                ret.fail(e)
            }
        }
        .exceptionally { ex ->
            logger.severe("Exception while creating monitored items: ${ex.message}")
            ex.printStackTrace()
            ret.fail(ex)
            null
        }
}
vogler75 commented 1 month ago

Thanks. I will make the batch size configurable. And maybe replace the recursion with promises. Is it really necessary to have the onItemCreated inside the batch? Can you check if it works if we make this once outside the batch?