microsoft / psi

Platform for Situated Intelligence
https://github.com/microsoft/psi/wiki
Other
529 stars 92 forks source link

Poor Serialization Performance for the Diagnostics's `TrackMessageSize` #290

Closed austinbhale closed 9 months ago

austinbhale commented 11 months ago

When enabling diagnostics for a pipeline with TrackMessageSize turned on, I found one of my receivers hanging for a very long time, for at least several minutes.

I attached a sample x64/ARM UWP script using StereoKit's Vertex struct, which wraps System.Numerics data types.

When computing the data size, is it expected for serialization to take (sometimes) several milliseconds per element in the array? Is it because it has to resolve the Systems.Numerics data type for each value? https://github.com/microsoft/psi/blob/d08fdd34f6957a92a6343d6e7978c2b8efc5f83a/Sources/Runtime/Microsoft.Psi/Serialization/ArraySerializer.cs#L45

I tested the array size with different values like so:

# elements Time (ms)
1000 134
10000 1335
20000 8416
30000 20528
namespace TestSample;

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using MathNet.Spatial.Euclidean;
using Microsoft.Psi;
using Microsoft.Psi.Data;
using Microsoft.Psi.MixedReality.StereoKit;
using StereoKit;
using Windows.Storage;

/// <summary>
/// HoloLens samples.
/// </summary>
public static class Program
{
    /// <summary>
    /// Main entry point.
    /// </summary>
    public static void Main()
    {
        // Initialize StereoKit
        if (!SK.Initialize(
            new SKSettings
            {
                appName = "HoloLensSample",
                assetsFolder = "Assets",
            }))
        {
            throw new Exception("StereoKit failed to initialize.");
        }

        var demos = new (string Name, Func<Pipeline> Run)[]
        {
                ("Test Debug Error", TestHang),
        };

        Pipeline pipeline = null;
        var starting = false;
        var stopping = false;
        var demo = string.Empty;
        Exception exception = null;
        var windowCoordinateSystem = default(CoordinateSystem);
        var windowPose = default(Pose);

        while (SK.Step(() =>
        {
            try
            {
                // Position the window near the head at the start
                var headPose = Input.Head.ToCoordinateSystem();

                if (windowCoordinateSystem == null)
                {
                    // Project forward 0.7 meters from the initial head pose (in the XY plane).
                    var forwardDirection = headPose.XAxis.ProjectOn(new MathNet.Spatial.Euclidean.Plane(headPose.Origin, UnitVector3D.ZAxis)).Direction;
                    var windowOrigin = headPose.Origin + forwardDirection.ScaleBy(0.7);
                    windowCoordinateSystem = LookAtPoint(windowOrigin, headPose.Origin);
                }
                else
                {
                    // Update to point toward the head
                    windowCoordinateSystem = windowPose.ToCoordinateSystem();
                    windowCoordinateSystem = LookAtPoint(windowCoordinateSystem.Origin, headPose.Origin);
                }

                windowPose = windowCoordinateSystem.ToStereoKitPose();
                UI.WindowBegin("Psi Demos", ref windowPose, new Vec2(30 * U.cm, 0));

                if (exception != null)
                {
                    UI.Label($"Exception: {exception.Message}");
                }
                else if (starting)
                {
                    UI.Label($"Starting {demo}...");
                }
                else if (stopping)
                {
                    UI.Label($"Stopping {demo}...");
                }
                else
                {
                    if (pipeline == null)
                    {
                        UI.Label("Choose a demo to run");
                        foreach (var (name, run) in demos)
                        {
                            if (UI.Button(name))
                            {
                                demo = name;
                                starting = true;
                                Task.Run(() =>
                                {
                                    pipeline = run();
                                    pipeline.RunAsync();
                                    starting = false;
                                });
                            }

                            UI.SameLine();
                        }

                        UI.NextLine();
                    }
                    else
                    {
                        UI.Label($"Running {demo}");
                        if (UI.Button($"Stop"))
                        {
                            stopping = true;
                            Task.Run(() =>
                            {
                                pipeline.Dispose();
                                pipeline = null;
                                stopping = false;
                            });
                        }

                        UI.SameLine();
                    }
                }

                if (UI.Button("Exit"))
                {
                    SK.Quit();
                }

                UI.WindowEnd();
            }
            catch (Exception ex)
            {
                exception = ex;
            }
        }))
        {
        }

        SK.Shutdown();
    }

    class TstProducer : IConsumer<int>, IProducer<Shared<Vertex[]>>
    {
        public Receiver<int> In { get; }
        public Emitter<Shared<Vertex[]>> Out { get; }
        public TstProducer(Pipeline pipeline)
        {
            Out = pipeline.CreateEmitter<Shared<Vertex[]>>(this, nameof(Out));
            In = pipeline.CreateReceiver<int>(this, (_, env) =>
            {
                using var verts = Shared.Create(new Vertex[10000]);
                Out.Post(verts, env.OriginatingTime);
            }, nameof(In));
        }
    }

    class TstReceiver : IConsumer<Shared<Vertex[]>>
    {
        public Receiver<Shared<Vertex[]>> In { get; }
        public TstReceiver(Pipeline pipeline)
        {
            In = pipeline.CreateReceiver<Shared<Vertex[]>>(this, (msg, env) =>
            {
                System.Diagnostics.Debug.WriteLine($"msg length: {msg.Resource.Length}");
            }, nameof(In));
        }
    }

    private static Stopwatch sw;
    public static Pipeline TestHang()
    {
        var pipeline = Pipeline.Create(
            nameof(TestHang),
            enableDiagnostics: true,
            diagnosticsConfiguration: new Microsoft.Psi.Diagnostics.DiagnosticsConfiguration
            {
                TrackMessageSize = true,
                AveragingTimeSpan = TimeSpan.FromSeconds(2),
                SamplingInterval = TimeSpan.FromSeconds(10),
                IncludeStoppedPipelines = true,
                IncludeStoppedPipelineElements = true,
            });

        PsiExporter store = PsiStore.Create(pipeline, nameof(TestHang), ApplicationData.Current.LocalFolder.Path);
        pipeline.Diagnostics.Write("Diagnostics", store);

        Generators.Range(pipeline, 0, 5, TimeSpan.FromMilliseconds(1))
                  .PipeTo(new TstProducer(pipeline))
                  .PipeTo(new TstReceiver(pipeline));

        pipeline.PipelineRun += (_, _) => { Debug.WriteLine("Started!"); sw = Stopwatch.StartNew(); };
        pipeline.PipelineCompleted += (_, _) => { Debug.WriteLine($"Completed in {sw.ElapsedMilliseconds}"); }; 

        return pipeline;
    }

    private static CoordinateSystem LookAtPoint(Point3D sourcePoint, Point3D targetPoint)
    {
        var forward = (targetPoint - sourcePoint).Normalize();
        var left = UnitVector3D.ZAxis.CrossProduct(forward);
        var up = forward.CrossProduct(left);
        return new CoordinateSystem(sourcePoint, forward, left, up);
    }
}
sandrist commented 10 months ago

Hi, sorry for missing this until now. One quick question: is the behavior reproducible outside of UWP and without StereoKit? Can you construct a more minimal test example that exhibits the behavior?

austinbhale commented 9 months ago

No worries! Yes, I've modified the script to a DotNet console app where you can comment out the specific portion in the Vertex struct to use either 1) common float array or 2) MathNet types. The tests at the bottom also exhibit the runtime differences when TrackMessageSize is enabled or not.

using Microsoft.Psi;
using Microsoft.Psi.Data;
using System.Diagnostics;
using MathNet.Spatial.Euclidean;

int[] sizes = { 1000, 10000, 20000, 30000 };

foreach (int size in sizes)
{
    using var pipeline = Pipeline.Create(
        "TestHang",
        enableDiagnostics: true,
        diagnosticsConfiguration: new Microsoft.Psi.Diagnostics.DiagnosticsConfiguration
        {
            TrackMessageSize = true,
            AveragingTimeSpan = TimeSpan.FromSeconds(2),
            SamplingInterval = TimeSpan.FromSeconds(10),
            IncludeStoppedPipelines = true,
            IncludeStoppedPipelineElements = true,
        });

    PsiExporter store = PsiStore.Create(pipeline, pipeline.Name, Directory.GetCurrentDirectory());
    pipeline.Diagnostics.Write("Diagnostics", store);

    Generators.Range(pipeline, 0, 5, TimeSpan.FromMilliseconds(1))
              .PipeTo(new TstProducer(pipeline, size))
              .PipeTo(new TstReceiver(pipeline));

    Stopwatch? sw = null;
    pipeline.PipelineRun += (_, _) => { Console.WriteLine("Started!"); sw = Stopwatch.StartNew(); };
    pipeline.PipelineCompleted += (_, _) => { Console.WriteLine($"Completed in {sw?.ElapsedMilliseconds}"); };

    pipeline.Run();
}

public struct Vertex
{
    public Vertex() {}

    // TEST 1: common types
    public double[] pos = {0, 0, 0};
    public double[] norm = {0, 0, 0};
    public double[] uv = {0, 0, 0};

    // TEST 2: MathNet types
    // public Vector3D pos;
    // public Vector3D norm;
    // public Vector3D uv;
}

class TstProducer : IConsumer<int>, IProducer<Shared<Vertex[]>>
{
    public Receiver<int> In { get; }
    public Emitter<Shared<Vertex[]>> Out { get; }
    public TstProducer(Pipeline pipeline, int size)
    {
        Out = pipeline.CreateEmitter<Shared<Vertex[]>>(this, nameof(Out));
        In = pipeline.CreateReceiver<int>(this, (_, env) =>
        {
            using var verts = Shared.Create(new Vertex[size]);
            Out.Post(verts, env.OriginatingTime);
        }, nameof(In));
    }
}

class TstReceiver : IConsumer<Shared<Vertex[]>>
{
    public Receiver<Shared<Vertex[]>> In { get; }
    public TstReceiver(Pipeline pipeline)
    {
        In = pipeline.CreateReceiver<Shared<Vertex[]>>(this, (msg, env) =>
        {
            Console.WriteLine($"msg length: {msg.Resource.Length}");
        }, nameof(In));
    }
}

TEST 1: Common Types TrackMessageSize=TRUE

>dotnet run
Started!
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
Completed in 42
Started!
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
Completed in 148
Started!
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
Completed in 1619
Started!
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
Completed in 4974

TEST 2: MathNet Types TrackMessageSize=TRUE

>dotnet run
Started!
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
Completed in 45
Started!
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
Completed in 2989
Started!
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
Completed in 6232
Started!
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
Completed in 12080

TEST 1: Common Types TrackMessageSize=FALSE

>dotnet run
Started!
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
Completed in 40
Started!
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
Completed in 4
Started!
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
Completed in 5
Started!
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
Completed in 5

TEST 2: MathNet Types TrackMessageSize=FALSE

>dotnet run
Started!
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
msg length: 1000
Completed in 40
Started!
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
msg length: 10000
Completed in 4
Started!
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
msg length: 20000
Completed in 4
Started!
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
msg length: 30000
Completed in 4
chitsaw commented 9 months ago

Thanks for the test code. We've identified a buffer allocation issue in BufferWriter. The fix in #293 should resolve this issue.

austinbhale commented 9 months ago

That makes a huge difference, thank you all so much!