Hugovdberg / PIconnect

A python connector to the OSISoft PI and PI-AF databases
MIT License
91 stars 40 forks source link

Feature request: Subscribe for updates from Data Pipe #795

Open aabrodskiy opened 1 week ago

aabrodskiy commented 1 week ago

Feature request

Abstract

PI AF SDK support subscription-based reading of data instead of polling, which is the most efficient way of continuously streaming data from PI/AF to consumers

Motivation and summary

Polling data for large amount of tags and/or high frequency puts significant load both on the PI Server and on the polling client. It would be great to support in python the same way the AF SDK supports callbacks on new messages coming from the Data Pipe, ideally from all three types of it in AF SDK.

Suggested solution

Implement messaging system with subscription maintenance and async callbacks. Ping connection to ensure it is active and doesn't drop, re-connect if the connection drops by the server and resubscribe for the subscribed tags / AF Attributes.

Rejected options

No alternatives available for Linux-based python as far as I'm aware

miguetronic commented 1 week ago

Please, make it real!

Hugovdberg commented 1 week ago

First of all, I'm surprised by the amount of upvotes in this short timespan.. not that it's a bad request, but apparently people are actively monitoring this repo :-)

Regarding the feature request, it would be nice to have this indeed. However, it might be tricky to get this to work nicely. The problem is that async is just syntactic sugar, not only in python, but in .NET as well. That means that async interaction between python and .NET is tricky because they both run a separate event loop, which might cause deadlocks, and I'm not well versed enough in the async world to be able to determine whether this is a serious risk in this case. That said, I did experiment with this some time ago without causing any deadlocks. I think that in the PIconnect/SDK interactions there is little reason for the SDK to ever wait on feedback from the python side, which probably helps.

Is there any example code of how the .NET subscription model works?

aabrodskiy commented 5 days ago

Thank you for a quick reply and attention. Wow, surprisingly this sounds like a hot topic for many. The most basic example implementation is described by Rick Davin here: AVEVA github I'll try to carve out a more elaborate example from our subscriber/receiver and post here.

aabrodskiy commented 5 days ago

This is an example of AF Scubscriber:

using Newtonsoft.Json;
using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using OSIsoft.AF.PI;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

    public sealed class AFDataSubscriber : IDisposable
    {
        #region Private Variables
        private static AFDataSubscriber _instance = null;
        private static readonly object _mutex = new Object();
        private Dictionary<Guid, SubscribedAFAttribute> _subscribedAttributes = new Dictionary<Guid, SubscribedAFAttribute>();
        private readonly AFDataPipe _afDataPipe = new AFDataPipe();
        private Timer _timer;
        private CancellationToken _ct;
        #endregion

        #region Constructor
        private AFDataSubscriber(CancellationToken cancellationToken)
        {
            _ct = cancellationToken;
            // Private Singleton constructor
            _afDataPipe.Subscribe(new AFDataReceiver(_ct));
        }
        #endregion

        #region Private Methods
        private void CheckForData(object o)
        {
            bool hasMoreEvents;
            do
            {
                _afDataPipe.GetObserverEvents(out hasMoreEvents);
            }
            while (hasMoreEvents);
        }

        private void AddSignup(IList<AFAttribute> attributes)
        {
            List<AFAttribute> attributesToSignup = new List<AFAttribute>();
            foreach (AFAttribute attribute in attributes)
            {
                if (_subscribedAttributes.ContainsKey(attribute.ID))
                {
                    SubscribedAFAttribute subscribedAttribute = _subscribedAttributes[attribute.ID];
                    if (!subscribedAttribute.IsSubscribed)
                    {
                        subscribedAttribute.IsSubscribed = true;
                        attributesToSignup.Add(subscribedAttribute.AttributeLookup.Attribute);
                    }
                }
            }
            if (attributesToSignup.Count > 0)
            {
                _afDataPipe.AddSignups(attributesToSignup);
            }
        }

        #endregion

        #region Public Methods
        // Singleton entry
        public static AFDataSubscriber GetInstance(CancellationToken cancellationToken)
        {
            if (_instance == null)
            {
                lock (_mutex)
                {
                    if (_instance == null)
                    {
                        _instance = new AFDataSubscriber(cancellationToken);
                    }
                }
            }
            return _instance;
        }

        public void StartListening(TimeSpan checkIntervall)
        {
            if (_timer == null)
                _timer = new Timer(CheckForData, null, 0, (int)checkIntervall.TotalMilliseconds);
        }

        public void StopListening()
        {
            if (_timer != null)
                _timer.Dispose();
        }

        public void Listen()
        {
            if (_subscribedAttributes.Count > 0)
            {
                StartListening(TimeSpan.FromSeconds(5));
            }
        }

        public void Dispose()
        {
            StopListening();
            _afDataPipe.Dispose();
        }

        public static void CloseInstance()
        {
            if (_instance != null)
            {
                _instance.Dispose();
                _instance = null;
                Log.Information("Monitoring AF Events Stopped");
            }
        }
        #endregion
    }
}
aabrodskiy commented 4 days ago

And here is the receiver of the events from the pipe

using OSIsoft.AF.Asset;
using OSIsoft.AF.Data;
using Serilog;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

    public class AFDataReceiver : IObserver<AFDataPipeEvent>
    {
        private ConcurrentDictionary<string, ConcurrentDictionary<string, TValue>> _batchDataDict = new ConcurrentDictionary<string, ConcurrentDictionary<string, TValue>>();
        private readonly Timer _batchTimer;
        private readonly object _timerLock = new object(); // Create a lock object for the timer
        private readonly object _dictLockObj = new object();
        private readonly CancellationToken _cancellationToken; // Add a field to store the cancellation token

        public AFDataReceiver(CancellationToken cancellationToken)
        {
            _cancellationToken = cancellationToken; // Store the cancellation token
        }

        /// <summary>  
        /// Provides the observer with new data.  
        /// </summary>  
        /// <param name="value"></param>  
        public void OnNext(AFDataPipeEvent value)
        {
            /// logic to process data goes here, we put it in a dictionary to batch out output
        }

        /// <summary>  
        /// An error has occured  
        /// </summary>  
        /// <param name="error"></param>  
        public void OnError(Exception error)
        {
            Log.Error("Provider (AF) has sent an error");
            Log.Error(error.Message);
            Log.Error(error.StackTrace);
        }

        /// <summary>  
        /// Notifies the observer that the provider has finished sending push-based notifications.  
        /// </summary>  
        public void OnCompleted()
        {
            Log.Information("Provider (AF) has terminated sending data");
        }
    }