OPCFoundation / UA-.NETStandard

OPC Unified Architecture .NET Standard
Other
1.97k stars 950 forks source link

Monitored items notifications not received. #2841

Open ionut-gheorghe opened 1 week ago

ionut-gheorghe commented 1 week ago

Type of issue

Current Behavior

I refactored my lib for the opc client but with the update I'm not receving the notifications from the monitored items. I currently use asp.net core 8 and the service (OpcUaClient2) is intantiated as a singleton.

The currious thing is that while I'm steping through the subscribing code, the breakpoint on the notifications method is hit, but after that nothing. If I debug without stepping through, the breakpoint is never hit. Notification on another thread?

The class for the UaClient is from UA-.NETStandard/Applications/ConsoleReferenceClient/UAClient.cs and to intantiate it a variation of Program.cs from the same route. Tested with both configuration from code and xml file and againt Kepware server. Value of the item is changing, verified with UaExpert and with the old version my lib.

Surrely I'm missing something and I dont know what it is.

public class UAClient : IUAClient, IDisposable
{
    private readonly object _lock = new();

    private readonly ILogger _logger;

    private readonly Action<IList, IList> _validateResponse;

    private readonly ApplicationConfiguration _configuration;

    private bool _disposed = false;

    private SessionReconnectHandler _reconnectHandler;

    private readonly ReverseConnectManager? _reverseConnectManager;

    private ISession _session;

    /// <summary>
    /// Initializes a new instance of the UAClient class.
    /// </summary>
    public UAClient(ApplicationConfiguration configuration, ILogger logger, Action<IList, IList> validateResponse)
    {
        _validateResponse = validateResponse;
        _logger = logger;
        _configuration = configuration;
        _configuration.CertificateValidator.CertificateValidation += CertificateValidation;
        _reverseConnectManager = null;
    }

    /// <summary>
    /// Initializes a new instance of the UAClient class for reverse connections.
    /// </summary>
    public UAClient(ApplicationConfiguration configuration, ReverseConnectManager reverseConnectManager, ILogger logger, Action<IList, IList> validateResponse)
    {
        _validateResponse = validateResponse;
        _logger = logger;
        _configuration = configuration;
        _configuration.CertificateValidator.CertificateValidation += CertificateValidation;
        _reverseConnectManager = reverseConnectManager;
    }

    /// <summary>
    /// Auto accept untrusted certificates.
    /// </summary>
    public bool AutoAccept { get; set; } = false;

    /// <summary>
    /// The session keepalive interval to be used in ms.
    /// </summary>
    public int KeepAliveInterval { get; set; } = 5000;

    /// <summary>
    /// The file to use for log output.
    /// </summary>
    public string LogFile { get; set; }

    /// <summary>
    /// The reconnect period to be used in ms.
    /// </summary>
    public int ReconnectPeriod { get; set; } = 1000;

    /// <summary>
    /// The reconnect period exponential backoff to be used in ms.
    /// </summary>
    public int ReconnectPeriodExponentialBackoff { get; set; } = 15000;

    /// <summary>
    /// Gets the client session.
    /// </summary>
    public ISession Session => _session;

    /// <summary>
    /// The session lifetime.
    /// </summary>
    public uint SessionLifeTime { get; set; } = 60 * 1000;

    /// <summary>
    /// The user identity to use to connect to the server.
    /// </summary>
    public IUserIdentity UserIdentity { get; set; } = new UserIdentity();

    /// <summary>
    /// Action used
    /// </summary>
    Action<IList, IList> ValidateResponse => _validateResponse;

    /// <summary>
    /// Creates a session with the UA server
    /// </summary>
    public async Task<bool> ConnectAsync(string serverUrl, bool useSecurity = true, CancellationToken ct = default)
    {
        if (_disposed)
            throw new ObjectDisposedException(nameof(UAClient));
        if (serverUrl == null)
            throw new ArgumentNullException(nameof(serverUrl));

        try
        {
            if (_session != null && _session.Connected == true)
            {
                _logger.LogInformation("Session already connected!");
            }
            else
            {
                ITransportWaitingConnection connection = null;
                EndpointDescription endpointDescription = null;
                if (_reverseConnectManager != null)
                {
                    _logger.LogInformation("Waiting for reverse connection to.... {0}", serverUrl);
                    do
                    {
                        using var cts = new CancellationTokenSource(30_000);
                        using var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(ct, cts.Token);

                        connection = await _reverseConnectManager.WaitForConnection(new Uri(serverUrl), null, linkedCTS.Token).ConfigureAwait(false);
                        if (connection == null)
                            throw new ServiceResultException(StatusCodes.BadTimeout, "Waiting for a reverse connection timed out.");
                        if (endpointDescription == null)
                        {
                            _logger.LogInformation("Discover reverse connection endpoints....");
                            endpointDescription = CoreClientUtils.SelectEndpoint(_configuration, connection, useSecurity);
                            connection = null;
                        }
                    } while (connection == null);
                }
                else
                {
                    _logger.LogInformation("Connecting to... {0}", serverUrl);
                    endpointDescription = CoreClientUtils.SelectEndpoint(_configuration, serverUrl, useSecurity);
                }

                // Get the endpoint by connecting to server's discovery endpoint.
                // Try to find the first endopint with security.
                EndpointConfiguration endpointConfiguration = EndpointConfiguration.Create(_configuration);
                ConfiguredEndpoint endpoint = new ConfiguredEndpoint(null, endpointDescription, endpointConfiguration);

                var sessionFactory = TraceableSessionFactory.Instance;

                // Create the session
                var session = await sessionFactory.CreateAsync(
                    _configuration,
                    connection,
                    endpoint,
                    connection == null,
                    false,
                    _configuration.ApplicationName,
                    SessionLifeTime,
                    UserIdentity,
                    null,
                    ct
                ).ConfigureAwait(false);

                // Assign the created session
                if (session != null && session.Connected)
                {
                    _session = session;

                    // override keep alive interval
                    _session.KeepAliveInterval = KeepAliveInterval;

                    // support transfer
                    _session.DeleteSubscriptionsOnClose = false;
                    _session.TransferSubscriptionsOnReconnect = true;

                    // set up keep alive callback.
                    _session.KeepAlive += Session_KeepAlive;

                    // prepare a reconnect handler
                    _reconnectHandler = new SessionReconnectHandler(true, ReconnectPeriodExponentialBackoff);

                    // Session created successfully.
                    _logger.LogInformation("New Session Created with SessionName = {0}", _session.SessionName);

                    return true;
                }
            }

            return true;
        }
        catch (Exception ex)
        {
            // Log Error
            _logger.LogError("Create Session Error : {0}", ex.Message);
            return false;
        }
    }

    /// <summary>
    /// Disconnects the session.
    /// </summary>
    /// <param name="leaveChannelOpen">Leaves the channel open.</param>
    public void Disconnect(bool leaveChannelOpen = false)
    {
        try
        {
            if (_session != null)
            {
                _logger.LogInformation("Disconnecting...");

                lock (_lock)
                {
                    _session.KeepAlive -= Session_KeepAlive;
                    _reconnectHandler?.Dispose();
                    _reconnectHandler = null;
                }

                _session.Close(!leaveChannelOpen);
                if (leaveChannelOpen)
                {
                    // detach the channel, so it doesn't get closed when the session is disposed.
                    _session.DetachChannel();
                }
                _session.Dispose();
                _session = null;

                // Log Session Disconnected event
                _logger.LogInformation("Session Disconnected.");
            }
            else
            {
                _logger.LogInformation("Session not created!");
            }
        }
        catch (Exception ex)
        {
            // Log Error
            _logger.LogError($"Disconnect Error : {ex.Message}");
        }
    }

    /// <summary>
    /// Dispose objects.
    /// </summary>
    public void Dispose()
    {
        _disposed = true;
        Utils.SilentDispose(_session);
        _configuration.CertificateValidator.CertificateValidation -= CertificateValidation;
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Handles the certificate validation event.
    /// This event is triggered every time an untrusted certificate is received from the server.
    /// </summary>
    protected virtual void CertificateValidation(CertificateValidator sender, CertificateValidationEventArgs e)
    {
        bool certificateAccepted = false;

        // ****
        // Implement a custom logic to decide if the certificate should be
        // accepted or not and set certificateAccepted flag accordingly.
        // The certificate can be retrieved from the e.Certificate field
        // ***

        ServiceResult error = e.Error;
        _logger.LogWarning(error.ToString(), error);
        if (error.StatusCode == StatusCodes.BadCertificateUntrusted && AutoAccept)
        {
            certificateAccepted = true;
        }

        if (certificateAccepted)
        {
            _logger.LogWarning("Untrusted Certificate accepted. Subject = {0}", e.Certificate.Subject);
            e.Accept = true;
        }
        else
        {
            _logger.LogError("Untrusted Certificate rejected. Subject = {0}", e.Certificate.Subject);
        }
    }

    /// <summary>
    /// Called when the reconnect attempt was successful.
    /// </summary>
    private void Client_ReconnectComplete(object sender, EventArgs e)
    {
        // ignore callbacks from discarded objects.
        if (!ReferenceEquals(sender, _reconnectHandler))
        {
            return;
        }

        lock (_lock)
        {
            // if session recovered, Session property is null
            if (_reconnectHandler.Session != null)
            {
                // ensure only a new instance is disposed
                // after reactivate, the same session instance may be returned
                if (!ReferenceEquals(_session, _reconnectHandler.Session))
                {
                    _logger.LogInformation("--- RECONNECTED TO NEW SESSION --- {0}", _reconnectHandler.Session.SessionId);
                    var session = _session;
                    _session = _reconnectHandler.Session;
                    Utils.SilentDispose(session);
                }
                else
                {
                    _logger.LogInformation("--- REACTIVATED SESSION --- {0}", _reconnectHandler.Session.SessionId);
                }
            }
            else
            {
                _logger.LogInformation("--- RECONNECT KeepAlive recovered ---");
            }
        }
    }

    /// <summary>
    /// Handles a keep alive event from a session and triggers a reconnect if necessary.
    /// </summary>
    private void Session_KeepAlive(ISession session, KeepAliveEventArgs e)
    {
        try
        {
            // check for events from discarded sessions.
            if (!_session.Equals(session))
            {
                return;
            }

            // start reconnect sequence on communication error.
            if (ServiceResult.IsBad(e.Status))
            {
                if (ReconnectPeriod <= 0)
                {
                    Utils.LogWarning("KeepAlive status {0}, but reconnect is disabled.", e.Status);
                    return;
                }

                var state = _reconnectHandler.BeginReconnect(_session, _reverseConnectManager, ReconnectPeriod, Client_ReconnectComplete);
                if (state == SessionReconnectHandler.ReconnectState.Triggered)
                {
                    Utils.LogInfo("KeepAlive status {0}, reconnect status {1}, reconnect period {2}ms.", e.Status, state, ReconnectPeriod);
                }
                else
                {
                    Utils.LogInfo("KeepAlive status {0}, reconnect status {1}.", e.Status, state);
                }

                // cancel sending a new keep alive request, because reconnect is triggered.
                e.CancelKeepAlive = true;

                return;
            }
        }
        catch (Exception exception)
        {
            Utils.LogError(exception, "Error in OnKeepAlive.");
        }
    }
}
internal class OpcUaClient2 : IOpcUaClient2
{
    private readonly OpcUaClientOptions options;
    private readonly ILogger logger;
    private UAClient uaClient;
    private bool connected;

    public OpcUaClient2(OpcUaClientOptions options, ILogger logger)
    {
        this.options = options;
        this.logger = logger;

        try
        {
            Initialize().Wait();
        }
        catch (Exception ex)
        {

            throw;
        }
    }

    public DataChangedDelegate DataChangedHandler { get; set; }

    private async Task Initialize()
    {
        // Createa instance
        ApplicationInstance application = await CreateApplicationInstance().ConfigureAwait(false);

        // Check the application certificate.
        try
        {
            bool haveAppCertificate = await application.CheckApplicationInstanceCertificate(false, minimumKeySize: 0, lifeTimeInMonths: 120)
                .ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            await application.DeleteApplicationInstanceCertificate().ConfigureAwait(false);
            throw new Exception("Application instance certificate invalid!. Restart the application!", ex);
        }

        using var uaClient = new UAClient(application.ApplicationConfiguration, logger, ClientBase.ValidateResponse)
        {
            AutoAccept = options.AutoAcceptServerCertificate,
            UserIdentity = new UserIdentity(options.Username, options.Password),
            SessionLifeTime = 60_000
        };

        // Connect to server
        connected = false;
        while (connected == false)
        {
            connected = await uaClient.ConnectAsync(options.ServerUrl, options.UseSecurity).ConfigureAwait(false);
            if (connected == true)
                break;
            else
            {
                logger.LogError("Application not connected! Retrying connection in 5s...");
                await Task.Delay(5000);
            }
        }

        // Enable subscription transfer
        uaClient.ReconnectPeriod = 1000;
        uaClient.ReconnectPeriodExponentialBackoff = 10000;
        //uaClient.Session.MinPublishRequestCount = 3;
        uaClient.Session.TransferSubscriptionsOnReconnect = true;

        // create avaible subscriptions
        foreach (var subs in options.Subscriptions)
        {
            Subscription subscription = new Subscription(uaClient.Session.DefaultSubscription)
            {
                DisplayName = options.ApplicationName + " - " + subs.DisplayName,
                PublishingEnabled = true,
                PublishingInterval = subs.PublishingInterval,
                //LifetimeCount = 10,
                SequentialPublishing = true,
                RepublishAfterTransfer = true,
                //DisableMonitoredItemCache = true,
                MaxNotificationsPerPublish = 1000,
                MinLifetimeInterval = (uint)uaClient.Session.SessionTimeout * 3,
            };

            uaClient.Session.AddSubscription(subscription);

            // Create the subscription on Server side
            await subscription.CreateAsync().ConfigureAwait(false);

            foreach (var nId in subs.NodeIds)
            {
                var item = new MonitoredItem(subscription.DefaultItem);
                item.StartNodeId = new NodeId(nId.NodeId);
                item.AttributeId = Attributes.Value;
                item.DisplayName = nId.DisplayName;
                item.SamplingInterval = nId.SamplingInterval;
                item.QueueSize = 1;
                item.DiscardOldest = true;
                item.Notification += Item_Notification;
                subscription.AddItem(item);
            }

            await subscription.ApplyChangesAsync().ConfigureAwait(false);
            logger.LogInformation($"New Subscription '{subscription.DisplayName}' created with SubscriptionId = {subscription.Id}.");
        }
    }

    private void Item_Notification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
    { // breakpoint here

        if (e.NotificationValue is MonitoredItemNotification notification)
        {
            DataChangedHandler?.Invoke(new OpcUaClientValue
            {
                MonitoredItem = monitoredItem,
                Notification = notification
            });
        }
    }

    private async Task<ApplicationInstance> CreateApplicationInstance()
    {
        CertificatePasswordProvider PasswordProvider = new(options.CertificatePassword);
        ApplicationInstance application = new()
        {
            ApplicationName = options.ApplicationName,
            ApplicationType = ApplicationType.Client,
            CertificatePasswordProvider = PasswordProvider
        };

        application.ApplicationConfiguration = await CreateConfiguration().ConfigureAwait(false);

        return application;
    }

    private async Task<ApplicationConfiguration> CreateConfiguration()
    {
        ApplicationConfiguration configuration = new();

        configuration.ApplicationName = options.ApplicationName;
        configuration.ApplicationUri = $"urn:{Utils.GetHostName()}:{options.ApplicationName}";
        configuration.ProductUri = $"uri:opcfoundation.org:{options.ApplicationName}";
        configuration.ApplicationType = ApplicationType.Client;

        configuration.SecurityConfiguration = new SecurityConfiguration
        {
            ApplicationCertificate = new CertificateIdentifier
            {
                // https://stackoverflow.com/a/75947695
                StoreType = CertificateStoreType.Directory,
                StorePath = @"%LocalApplicationData%/OPC Foundation/pki/own",
                SubjectName = $"CN={options.ApplicationName}, C=ES, S=Almeria, O=Cosentino, DC=localhost"
            },
            TrustedIssuerCertificates = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = @"%LocalApplicationData%/OPC Foundation/pki/issuer",
            },
            TrustedPeerCertificates = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = @"%LocalApplicationData%/OPC Foundation/pki/trusted",
            },
            RejectedCertificateStore = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = @"%LocalApplicationData%/OPC Foundation/pki/rejected",
            },
            AutoAcceptUntrustedCertificates = false,
            RejectSHA1SignedCertificates = true,
            RejectUnknownRevocationStatus = true,
            MinimumCertificateKeySize = 2048,
            AddAppCertToTrustedStore = false,
            SendCertificateChain = true,
            TrustedUserCertificates = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = @"%LocalApplicationData%/OPC Foundation/pki/trustedUser"
            }
        };

        configuration.TransportConfigurations = [];

        configuration.TransportQuotas = new TransportQuotas
        {
            OperationTimeout = 120000,
            MaxStringLength = 4194304,
            MaxByteStringLength = 4194304,
            MaxArrayLength = 65535,
            MaxMessageSize = 4194304,
            MaxBufferSize = 65535,
            ChannelLifetime = 300000,
            SecurityTokenLifetime = 3600000
        };

        configuration.ClientConfiguration = new ClientConfiguration
        {
            DefaultSessionTimeout = 60_000,
            WellKnownDiscoveryUrls =
            [
                "opc.tcp://{0}:4840",
                "http://{0}:52601/UADiscovery",
                "http://{0}/UADiscovery/Default.svc"
            ],
            MinSubscriptionLifetime = 1000,
            OperationLimits = new OperationLimits
            {
                MaxNodesPerRead = 2500,
                MaxNodesPerHistoryReadData = 1000,
                MaxNodesPerHistoryReadEvents = 1000,
                MaxNodesPerWrite = 2500,
                MaxNodesPerHistoryUpdateData = 1000,
                MaxNodesPerHistoryUpdateEvents = 1000,
                MaxNodesPerMethodCall = 2500,
                MaxNodesPerBrowse = 2500,
                MaxNodesPerRegisterNodes = 2500,
                MaxNodesPerTranslateBrowsePathsToNodeIds = 2500,
                MaxNodesPerNodeManagement = 2500,
                MaxMonitoredItemsPerCall = 2500,
            }
        };

        configuration.TraceConfiguration = new TraceConfiguration()
        {
            OutputFilePath = $@"%LocalApplicationData%/OPC Foundation/Logs/{options.ApplicationName}.log.txt",
            DeleteOnLoad = true,
            //    <!-- Show Only Errors -->
            //    <!-- <TraceMasks>1</TraceMasks> -->
            //    <!-- Show Only Security and Errors -->
            //    <!-- <TraceMasks>513</TraceMasks> -->
            //    <!-- Show Only Security, Errors and Trace -->
            //    <!-- <TraceMasks>515</TraceMasks> -->
            //    <!-- Show Only Security, COM Calls, Errors and Trace -->
            //    <!-- <TraceMasks>771</TraceMasks> -->
            //    <!-- Show Only Security, Service Calls, Errors and Trace -->
            //    <!-- <TraceMasks>523</TraceMasks> -->
            //    <!-- Show Only Security, ServiceResultExceptions, Errors and Trace -->
            //    <!-- <TraceMasks>519</TraceMasks> -->
        };

        await configuration.Validate(ApplicationType.Client).ConfigureAwait(false);

        return configuration;
    }

    public async Task<ReadResponse?> ReadAsync(ReadValueIdCollection nodesToRead, CancellationToken cancellationToken = default)
    {
        if (!connected)
            return null;

        ReadResponse result = await uaClient.Session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, cancellationToken).ConfigureAwait(false);
        return result;
    }

    public async Task<WriteResponse?> WriteAsync(WriteValueCollection nodesToWrite, CancellationToken cancellationToken = default)
    {
        if (!connected)
            return null;

        WriteResponse result = await uaClient.Session.WriteAsync(null, nodesToWrite, cancellationToken).ConfigureAwait(false);
        return result;
    }
}

Expected Behavior

To receive notifications

Steps To Reproduce

No response

Environment

- OS: Windows/Linux
- Environment:
- Runtime: .net core 8
- Nuget Version:
- Component:
- Server:
- Client: 1.5.374.126

Anything else?

No response