microsoft / psi

Platform for Situated Intelligence
https://github.com/microsoft/psi/wiki
Other
538 stars 96 forks source link

How to write asynchronous components to a store #252

Closed hetingjane closed 2 years ago

hetingjane commented 2 years ago

Discussed in https://github.com/microsoft/psi/discussions/251

Originally posted by **hetingjane** August 12, 2022 Hello there, I'm using Microsoft cognitive services Face API to recognize emotions from a video file. To do so, I wrote an EmotionDetector class and initialized a component EmotionDetectorComponent that took images and output Emotion strings and values. The component used an asynchronous receiver that's similar to the Cognitive services FaceRecognizer: protected override async Task ReceiveAsync(Shared data, Envelope e) { using Stream imageFileStream = new MemoryStream(); try { data.Resource.ToBitmap(false).Save(imageFileStream, ImageFormat.Jpeg); imageFileStream.Seek(0, SeekOrigin.Begin); var detected = (await this.client.Face.DetectWithStreamAsync(imageFileStream, recognitionModel: this.configuration.RecognitionModelName)).Select(d => d.FaceId.Value).ToList(); if (detected.Count > 0) { var identified = await this.client.Face.IdentifyAsync(detected, this.configuration.PersonGroupId); var results = identified.Select(p => (IList<(string, double)>)p.Candidates.Select(c => (this.people[c.PersonId].Name, c.Confidence)).ToList()).ToList(); this.Out.Post(results, e.OriginatingTime); } else { this.Out.Post(Empty, e.OriginatingTime); } } catch (APIErrorException exception) { if (exception.Body.Error.Code == "RateLimitExceeded") { this.RateLimitExceeded.Post(true, e.OriginatingTime); } else { throw; } } } Now I want to write its outputs into the same \PSI store and my pipeline looks like this: ``` player.Image.PipeTo(EmotionDetectorComponent.ImageIn, DeliveryPolicy.LatestMessage); EmotionDetectorComponent.StringOut.Write("EmotionLabels", store, true, DeliveryPolicy.Unlimited); EmotionDetectorComponent.ValueOut.Write("EmotionValues", store, true, DeliveryPolicy.Unlimited); ``` When I run it the terminal outputs: ![Screenshot 2022-08-12 120704](https://user-images.githubusercontent.com/37964198/184397021-3d66db27-cdec-4850-bf70-75f7f76c319c.png) The bottom two lines indicated some messages with an earlier originating time (2:43:59 PM) are returned later than the later messages (2:43:59 PM). And I got an exception: `System.InvalidOperationException: Attempted to post a message without strictly increasing originating time or that is out of order in wall-clock time: EmotionLabels` I think it's caused by the message originating time messed up when writing to a store. Does anyone know how to fix this? Do I need to write a Fuse component that handles the input and output streams? Any help would be appreciated!
chitsaw commented 2 years ago

Hi there.

The exception you're seeing is indeed due to an attempt to post messages on a stream that are not strictly increasing in originating time (a requirement in \psi). The question is, on which stream are you seeing this? From the code snippet you shared above, you are piping the player.Image stream to the EmotionDetectorComponent.ImageIn receiver. The EmotionDetectorComponent posts its output on two streams - StringOut and ValueOut, which are then written to your \psi store.

If any of these 3 streams contains out of order messages, that exception will be thrown. Assuming the input messages from player.Image are all in correct originating time order, either StringOut or ValueOut must be posting an out of order message. However, if your code follows a similar pattern to the CognitiveServices FaceRecognizer code that you referenced above, I'm not sure how this could happen since the input originating times are used when posting the output. Even though the receiver method is async, internally the runtime actually waits for the async task to complete before delivering the next message, so there shouldn't be a possibility of multiple images being processed simultaneously and potentially posting out of order.

Would you be able to share the code for your EmotionDetectorComponent that shows how you are posting messages on these two output streams, and specifically where the message originating times shown in your terminal output are printed?

hetingjane commented 2 years ago

Thanks for the reply!

Sure. Here's my async function ReceiveImage() in the EmotionDetectorclass:

// The receive method for the ImageIn receiver. This executes every time a message arrives on ImageIn.
        private async void ReceiveImage(Shared<Image> sharedImage, Envelope e)
        {

            using Stream imageFileStream = new MemoryStream();
            try
            {
                // convert image to a Stream and send to Cog Services
                sharedImage.Resource.ToBitmap(false).Save(imageFileStream, ImageFormat.Jpeg);
                imageFileStream.Seek(0, SeekOrigin.Begin);

                IList<DetectedFace> detectedFaces;
                var allAttributes = new List<FaceAttributeType>
                {
                    FaceAttributeType.Emotion
                };

                detectedFaces = (await this.client.Face.DetectWithStreamAsync(imageFileStream,
                    returnFaceAttributes: allAttributes,
                    // We specify detection model 1 because we are retrieving attributes.
                    detectionModel: DetectionModel.Detection01,
                    recognitionModel: this.configuration.RecognitionModelName));

                Console.WriteLine($"{detectedFaces.Count} face(s) detected from image.");
                // Parse and print all attributes of each detected face.
                if (detectedFaces.Count > 0)
                {
                    foreach (var face in detectedFaces)
                    {

                        // Get emotion on the face
                        string emotionType = string.Empty;
                        double emotionValue = 0.0;
                        Emotion emotion = face.FaceAttributes.Emotion;
                        if (emotion.Anger > emotionValue) { emotionValue = emotion.Anger; emotionType = "Anger"; }
                        if (emotion.Contempt > emotionValue) { emotionValue = emotion.Contempt; emotionType = "Contempt"; }
                        if (emotion.Disgust > emotionValue) { emotionValue = emotion.Disgust; emotionType = "Disgust"; }
                        if (emotion.Fear > emotionValue) { emotionValue = emotion.Fear; emotionType = "Fear"; }
                        if (emotion.Happiness > emotionValue) { emotionValue = emotion.Happiness; emotionType = "Happiness"; }
                        if (emotion.Neutral > emotionValue) { emotionValue = emotion.Neutral; emotionType = "Neutral"; }
                        if (emotion.Sadness > emotionValue) { emotionValue = emotion.Sadness; emotionType = "Sadness"; }
                        if (emotion.Surprise > emotionValue) { emotionType = "Surprise"; }

                       // The print statement `Console.WriteLine(e.OriginatingTime);` used to be here, or right after posting the two messages, I removed it from my latest commits and I'm not sure where I put it before.
                        this.StringOut.Post(emotionType, e.OriginatingTime);
                        this.ValueOut.Post(emotionValue, e.OriginatingTime);

                    }

                }
                else
                {
                    this.StringOut.Post(EmotionEmpty, e.OriginatingTime);  // EmotionEmpty and ValueEmpty are private propoties of the EmotionDetector class with EmotionEmpty = "not detected", ValueEmpty = 0.0
                    this.ValueOut.Post(ValueEmpty, e.OriginatingTime);

                }
            }
            catch (APIErrorException exception)
            {
                // swallow exceptions unless it's a rate limit exceeded
                if (exception.Body.Error.Code == "RateLimitExceeded")
                {
                    this.RateLimitExceeded.Post(true, e.OriginatingTime);
                }
                else
                {
                    throw;
                }
            }
}
chitsaw commented 2 years ago

Thanks for sharing your code. I see one potential problem here. If multiple faces are detected, the foreach loop will attempt to post multiple messages with the same originating time on both the StringOut and ValueOut streams. This would violate the "strictly increasing originating time" requirement for messages on a stream. In cases like this where you want to post multiple messages on a stream representing multiple results with the same originating time, you should instead post a single message consisting of a collection or dictionary containing the multiple results.

That being said, the console output suggests that you only ever saw one face, so the above issue should not have been a factor. It also does not explain why you saw a console output of 8/11/2022 2:44:00 PM followed by 8/11/2022 2:39:59 PM. On the surface, it does appear as though your ReceiveImage method is being invoked on the next message before the current task has completed, though I can't see why that would happen.

Could you share the entirety of your EmotionDetector class and the main program code where you are instantiating and running your pipeline? Or better still, a reproducible test case that results in this exception being thrown? Other things I would suggest to debug this further would be to add more logging or try running in the debugger with exception settings set to break on all CLR exceptions.

hetingjane commented 2 years ago

Sure. Thanks for the help! Here's the entire code, and it regenerates the timestamp not strictly increasing exception:

image image

Program.cs

using System;
using Microsoft.Psi;
using Microsoft.Psi.Media;
using System.IO;

namespace storeRAVDESS
{
    class Program
    {
        // From your Face subscription in the Azure portal, get your subscription key and endpoint.
        const string SUBSCRIPTION_KEY = "a";
        const string ENDPOINT = "b";

        static void Main(string[] args)
        {

            // Authenticate.

            var configuration = new EmotionDetectorConfiguration(SUBSCRIPTION_KEY, ENDPOINT);

            // Specify dataset video files directory 
            DirectoryInfo di = new DirectoryInfo("Y:/Eliciting Affect/pilot studies/001");
            FileInfo[] files = di.GetFiles("*.webm");  
            foreach (FileInfo file in files)
            {
                Console.WriteLine(file.Name);
                using var p = Pipeline.Create();
                var EmotionDetectorComponent = new EmotionDetector(p, configuration);

                // Create a store to write data to
                var store = PsiStore.Create(p, file.Name.Split(".")[0], "c:\\recordings");

                // Create an video source component
                var player = new MediaSource(p, file.FullName);

                // pipe to the cognitive service API
                player.Image.PipeTo(EmotionDetectorComponent.ImageIn, DeliveryPolicy.SynchronousOrThrottle);

                EmotionDetectorComponent.StringOut.Write("EmotionLabels", store, true, DeliveryPolicy.Unlimited);
                EmotionDetectorComponent.ValueOut.Write("EmotionValues", store, true, DeliveryPolicy.Unlimited);

                player.Image.Write("videoFrame", store);

                // Run the pipeline
                p.Run(); 
                p.Dispose();

            }
        }
    }
}

EmotionDetector.cs

using System;
using System.Collections.Generic;
using System.Drawing.Imaging;
using System.IO;
using Microsoft.Azure.CognitiveServices.Vision.Face;
using Microsoft.Azure.CognitiveServices.Vision.Face.Models;
using Microsoft.Psi;
using Microsoft.Psi.Imaging;

namespace storeRAVDESS
{
    public sealed class EmotionDetector
    {

        /// Empty results.
        private static readonly string EmotionEmpty = "not detected";
        private static readonly double ValueEmpty = 0.0;

        /// The configuration to use for this component.
        private readonly EmotionDetectorConfiguration configuration;

        /// The client that communicates with the cloud image analyzer service.
        private FaceClient client = null;

        public EmotionDetector(Pipeline pipeline, EmotionDetectorConfiguration configuration)
        {
            this.configuration = configuration;
            this.RateLimitExceeded = pipeline.CreateEmitter<bool>(this, nameof(this.RateLimitExceeded));
            this.client = new FaceClient(new ApiKeyServiceClientCredentials(this.configuration.SubscriptionKey))
            {
                Endpoint = this.configuration.Endpoint,
            };

            // create the receivers
            this.ImageIn = pipeline.CreateReceiver<Shared<Image>>(this, ReceiveImage, nameof(this.ImageIn));

            // create the emitter
            this.StringOut = pipeline.CreateEmitter<string>(this, nameof(this.StringOut));
            this.ValueOut = pipeline.CreateEmitter<double>(this, nameof(this.ValueOut));

        }

        public Emitter<bool> RateLimitExceeded { get; }

        /// Disposes the EmotionDetector component.

        public void Dispose()
        {
            this.client.Dispose();
            this.client = null;
        }

        // Receiver that encapsulates the string input stream
        public Receiver<Shared<Image>> ImageIn { get; private set; }
        // Emitter that encapsulates the output stream
        public Emitter<string> StringOut { get; private set; }

        // Emitter that encapsulates the output stream
        public Emitter<double> ValueOut { get; private set; }
        // The receive method for the ImageIn receiver. This executes every time a message arrives on ImageIn.
        private async void ReceiveImage(Shared<Image> sharedImage, Envelope e)
        {

            using Stream imageFileStream = new MemoryStream();
            try
            {
                // convert image to a Stream and send to Cog Services
                sharedImage.Resource.ToBitmap(false).Save(imageFileStream, ImageFormat.Jpeg);
                imageFileStream.Seek(0, SeekOrigin.Begin);

                IList<DetectedFace> detectedFaces;
                var allAttributes = new List<FaceAttributeType>
                {
                    FaceAttributeType.Emotion
                };

                detectedFaces = (await this.client.Face.DetectWithStreamAsync(imageFileStream,
                    returnFaceAttributes: allAttributes,
                    // We specify detection model 1 because we are retrieving attributes.
                    detectionModel: DetectionModel.Detection01,
                    recognitionModel: this.configuration.RecognitionModelName));

                Console.WriteLine($"{detectedFaces.Count} face(s) detected from image.");
                // Parse and print all attributes of each detected face.
                if (detectedFaces.Count > 0)
                {
                    foreach (var face in detectedFaces)
                    {

                        // Get emotion on the face
                        string emotionType = string.Empty;
                        double emotionValue = 0.0;
                        Emotion emotion = face.FaceAttributes.Emotion;
                        if (emotion.Anger > emotionValue) { emotionValue = emotion.Anger; emotionType = "Anger"; }
                        if (emotion.Contempt > emotionValue) { emotionValue = emotion.Contempt; emotionType = "Contempt"; }
                        if (emotion.Disgust > emotionValue) { emotionValue = emotion.Disgust; emotionType = "Disgust"; }
                        if (emotion.Fear > emotionValue) { emotionValue = emotion.Fear; emotionType = "Fear"; }
                        if (emotion.Happiness > emotionValue) { emotionValue = emotion.Happiness; emotionType = "Happiness"; }
                        if (emotion.Neutral > emotionValue) { emotionValue = emotion.Neutral; emotionType = "Neutral"; }
                        if (emotion.Sadness > emotionValue) { emotionValue = emotion.Sadness; emotionType = "Sadness"; }
                        if (emotion.Surprise > emotionValue) { emotionType = "Surprise"; }

                        Console.WriteLine(emotionType+" "+emotionValue.ToString()+" "+ e.OriginatingTime);
                        this.StringOut.Post(emotionType, e.OriginatingTime);
                        this.ValueOut.Post(emotionValue, e.OriginatingTime);

                    }

                }
                else
                {
                    this.StringOut.Post(EmotionEmpty, e.OriginatingTime);
                    this.ValueOut.Post(ValueEmpty, e.OriginatingTime);

                }
            }
            catch (APIErrorException exception)
            {
                // swallow exceptions unless it's a rate limit exceeded
                if (exception.Body.Error.Code == "RateLimitExceeded")
                {
                    this.RateLimitExceeded.Post(true, e.OriginatingTime);
                }
                else
                {
                    throw;
                }
            }
        }

    }
}

EmotionDetectorConfiguration.cs

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

namespace storeRAVDESS
{
    using Microsoft.Azure.CognitiveServices.Vision.Face.Models;

    /// Represents the configuration for the FaceRecognizer component.
    public sealed class EmotionDetectorConfiguration
    {
        /// Initializes a new instance of the FaceRecognizerConfiguration class.
        public EmotionDetectorConfiguration(string subscriptionKey, string subscriptionPoint)
        {
            this.SubscriptionKey = subscriptionKey;
            this.Endpoint = subscriptionPoint;

        }

        /// Gets or sets the subscription key for Cognitive Services Face API.
        public string SubscriptionKey { get; set; }

        /// Gets or sets the endpoint for Cognitive Services Face API.
        public string Endpoint { get; set; }

        /// Gets or sets the recognition model.
        public string RecognitionModelName { get; set; } = RecognitionModel.Recognition04;
    }
}
sandrist commented 2 years ago

For creating async receivers, you should use the pipeline.CreateAsyncReceiver method. The method signature for your ReceiveImage method will also need to be changed to private async Task.

That should hopefully fix the posting out of order exceptions you're seeing, but as Nick pointed out, you will still see a similar exception if your face detector ever detects more than 1 face (as it posts on the same emitter separately for each face result with the same originating time).

hetingjane commented 2 years ago

Update:

I recorded another video and made sure only one face was there. It works now!

Thank you both so much! I did as you suggested and fed in another video with only one face detected. Everything ran smoothly for a few minutes until I saw the same exception occur again. I'm trying to fix it now.

image
sandrist commented 2 years ago

It sounds like the original issue was solved, so I will close this for now. Feel free to open it back up if you have any follow up questions.