microsoft / psi

Platform for Situated Intelligence
https://github.com/microsoft/psi/wiki
Other
538 stars 96 forks source link

[Question] Subpipline stop without stopping parent pipline #291

Closed AuMilliat closed 7 months ago

AuMilliat commented 1 year ago

Hello, I'm trying to figure if there is a way to stop a subpipline without stopping the parent pipline. My goal is to start and stop subpipline from rendezvous system (ie a subpipline handling a kinect azure).

My understanding is that the parent pipline needs to remove the subpipline from private ConcurrentQueue<PipelineElement> components

I've made a quick add in Subpipline class :

        /// <summary>
        /// Remove subpipline from parent.
        /// </summary>
        public override void Dispose()
        {
            this.Stop(this.parentPipeline.GetCurrentTime(), true);
            this.DeactivateComponents();
            this.DisposeComponents();
            this.parentPipeline.RemoveSubpipline(this);
        }

In Pipeline class, DeactivateComponents & DisposeComponents methods switch from private to protected and add

       internal bool RemoveSubpipline(Subpipeline subpipeline)
       {
           PipelineElement node = this.components.FirstOrDefault(c => c.StateObject == subpipeline);
           if (node == null)
           {
               return false;
           }

           if (!subpipeline.IsCompleted)
           {
               throw new InvalidOperationException($"Subpipeline is still running, it can't be removed from parent pipeline.");
           }

           SynchronizationLock locker = new SynchronizationLock(this, true);
           this.scheduler.Freeze(locker);
           this.components = new ConcurrentQueue<PipelineElement>(this.components.Where(x => x != node));
           this.DiagnosticsCollector?.PipelineElementDisposed(this, node);
           locker.Release();
           return true;
       }

It probably needs some optimisation and cases handling but it works with

 static void Main(string[] args)
 {
     // Create the \psi pipeline
     Pipeline pipeline = Pipeline.Create("Subpipline Removal");
     var timer = Timers.Timer(pipeline, TimeSpan.FromSeconds(1));
     timer.Out.Do(t =>
     {
         Console.WriteLine($"\tPipeline timer.");
     });
     // Start the pipeline running
     pipeline.RunAsync();
     Console.WriteLine("Pipeline Run Asynch.");

     Subpipeline sub1 = new Subpipeline(pipeline); 
     var subtimer1 = Timers.Timer(sub1, TimeSpan.FromSeconds(1));
     subtimer1.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 1 timer.");
     });
     RemoteExporter exporter1 = new RemoteExporter(sub1, 11511, TransportKind.Tcp);
     exporter1.Exporter.Write(subtimer1.Out, "SubExporter1");
     sub1.RunAsync();
     Console.WriteLine("Subpipeline 1 Run Asynch.");
     Thread.Sleep(3000);
     Subpipeline sub2 = new Subpipeline(pipeline);
     var subtimer2 = Timers.Timer(sub2, TimeSpan.FromSeconds(1));
     subtimer2.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 2 timer.");
     });
     RemoteExporter exporter2 = new RemoteExporter(sub2, 11512, TransportKind.Tcp);
     exporter2.Exporter.Write(subtimer2.Out, "SubExporter2");
     sub2.RunAsync();
     Console.WriteLine("Subpipeline 2 Run Asynch.");
     Thread.Sleep(5000);
     Console.WriteLine("Dispose Subpipeline 1.");
     sub1.Dispose();
     Thread.Sleep(5000);
     Console.WriteLine("Dispose Subpipeline 2.");
     sub2.Dispose();
     Thread.Sleep(5000);
     Subpipeline sub3 = new Subpipeline(pipeline);
     var subtimer3 = Timers.Timer(sub3, TimeSpan.FromSeconds(1));
     subtimer3.Out.Do(t =>
     {
         Console.WriteLine($"\tSubpipeline 3 timer.");
     });
     RemoteExporter exporter3 = new RemoteExporter(sub3, 11513, TransportKind.Tcp);
     exporter3.Exporter.Write(subtimer3.Out, "SubExporter3");
     sub3.RunAsync();
     Console.WriteLine("Subpipeline 3 Run Asynch.");
     // Waiting for an out key
     Console.WriteLine("Press any key to stop the application.");
     Console.ReadLine();
     // Stop correctly the pipeline.
     pipeline.Dispose();
 }
AuMilliat commented 1 year ago

Few modifications to get it works with a real case:

In Subpipline.cs

 public override void Dispose()
 {
     this.Stop(this.parentPipeline.GetCurrentTime(), true);
     this.DisposeComponents();
     this.parentPipeline.RemoveSubpipline(this);
     this.DiagnosticsCollector?.PipelineDisposed(this);
 }

In Pipline.cs

 internal bool RemoveSubpipline(Subpipeline subpipeline)
 {
     PipelineElement node = this.components.FirstOrDefault(c => c.StateObject == subpipeline);
     if (node == null)
     {
         return false;
     }

     if (!subpipeline.IsCompleted)
     {
         throw new InvalidOperationException($"Subpipeline is still running, it can't be removed from parent pipeline.");
     }

     List<PipelineElement> list = subpipeline.Components.ToList();
     list.Add(node);
     SynchronizationLock locker = new SynchronizationLock(this, true);
     this.scheduler.Freeze(locker);
     this.components = new ConcurrentQueue<PipelineElement>(this.components.Where(x => !list.Contains(x)));
     this.DiagnosticsCollector?.PipelineElementDisposed(this, node);
     locker.Release();
     return true;
 }

It probably still needs some improvements (DiagnosticsCollector).

sandrist commented 1 year ago

Hello, can you explain a bit more about your use case which would benefit from stopping and resuming subpipelines? Pausing, stopping, and resuming (sub)pipelines mid-process is a bit tricky given the way the system has been designed so far. Take a look at the long discussion in issue #256.

Another idea is to look into the use of the Parallel Operator. This operator already has the ability to dynamically construct and tear down subpipelines according to the data flowing into it, and it's pretty extensible. It was carefully designed to do this in the right way. I wonder if you could use Parallel to achieve the goals in your scenario?

AuMilliat commented 1 year ago

Thanks for the answers and the information.

What we want to do is to be able to remove a subpipeline without disposing the parent pipeline. At the moment, in our setup we have a dedicated computer for handling a kinect azure camera. We’ve done a WPF application that stream the data through the RendezVous system. Our next step is to able able to start and stop the kinect streaming with a command from a remote computer (as we might have configuration modification). What we try to have is a pipeline that own RemoteClockImporter and a RemoteExporter for diagnostics. And have subpipeline for each RendezVous.Process that can be created and disposed while the pipline is still running.

We were looking at independents pipelines with Connector for the clock and the diagnostics. But maybe the ParallelOperator is the way to go, thanks!

AuMilliat commented 1 year ago

I've checked the ParallelOperator, it does not seems to work on my case as I have different type of streams. I may have found a better solution for my use case by adding a method in the Pipeline class :

        /// <summary>
        /// Create new pipeline setting time offset and diagnostics collector from given pipeline.
        /// </summary>
        /// <param name="pipeline">Pipeline to retrieve time offset and diagonistic configuration.</param>
        /// <param name="name">Pipeline name.</param>
        /// <param name="threadCount">Number of threads.</param>
        /// <param name="allowSchedulingOnExternalThreads">Whether to allow scheduling on external threads.</param>
        /// <param name="enableDiagnostics">Indicates whether to enable collecting and publishing diagnostics information on the Pipeline.Diagnostics stream.</param>
        /// <returns>Created pipeline.</returns>
        public static Pipeline CreateSynchedPipeline(
            Pipeline pipeline,
            string name = null,
            int threadCount = 0,
            bool allowSchedulingOnExternalThreads = false,
            bool enableDiagnostics = false)
        {
            Pipeline newPipeline = new Pipeline(name == null ? $"Synched|{pipeline.Name}" : name, pipeline.defaultDeliveryPolicy, threadCount, allowSchedulingOnExternalThreads);
            newPipeline.VirtualTimeOffset = pipeline.VirtualTimeOffset;
            newPipeline.DiagnosticsCollector = enableDiagnostics ? pipeline.DiagnosticsCollector : null;
            newPipeline.DiagnosticsConfiguration = pipeline.DiagnosticsConfiguration;
            return newPipeline;
        }
AuMilliat commented 7 months ago

I'm closing this issue, the implementation is available at https://github.com/SaacPSI/psi/tree/Pipline%2BExporter.