elsa-workflows / elsa-core

A .NET workflows library
https://v3.elsaworkflows.io/
MIT License
6.05k stars 1.1k forks source link

[BUG] Race condition in Bookmark Management #5597

Open frank-lusatia opened 3 weeks ago

frank-lusatia commented 3 weeks ago

Description

An activity creates a bookmark to be suspended and later resumed. The activity then publishes an event. A service waits for incoming events that are potential responses to previously published events. If an event arrives too early, the service will not find the bookmark that has already been created. As a result, the activity is never resumed.

Steps to Reproduce

  1. Detailed Steps:

    1. The activity creates a bookmark (line 97).
    2. The activity asynchronously publishes an event (line 98).
    3. The activity is suspended to be resumed later.
    4. The workflow service receives the published event (line 135).
    5. The workflow service retrieves all bookmarks (line 137 ff.).
    6. The workflow service filters the list of bookmarks for the bookmark created by the activity (line 141).
    7. The workflow service creates a DispatchWorkflowInstanceRequest for each filtered bookmark (line 142).
    8. The workflow service dispatches all instances of DispatchWorkflowInstanceRequest.

    If the workflow service retrieves all bookmarks immediately after receiving the incoming event, no bookmarks are found. If the workflow service calls Thread.Sleep(2000); (line 136) after receiving the incoming event, the previously created bookmark is found.

  2. Code Snippets:

namespace WebApplication8;

using Elsa.EntityFrameworkCore.Modules.Management;
using Elsa.EntityFrameworkCore.Modules.Runtime;
using Elsa.Extensions;
using Elsa.Workflows;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Filters;
using Elsa.Workflows.Runtime.Requests;
using Microsoft.Extensions.DependencyInjection;
using System.Collections.Concurrent;
using System.Threading;

public class Program
{
    public static void Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);
        builder.Services.AddElsa(elsa =>
        {
            // Configure Management layer to use EF Core.
            elsa.UseWorkflowManagement(management => management.UseEntityFrameworkCore());

            // Configure Runtime layer to use EF Core.
            elsa.UseWorkflowRuntime(runtime => runtime.UseEntityFrameworkCore());

            // Default Identity features for authentication/authorization.
            elsa.UseIdentity(identity =>
            {
                identity.TokenOptions = options => options.SigningKey = "sufficiently-large-secret-signing-key"; // This key needs to be at least 256 bits long.
                identity.UseAdminUserProvider();
            });

            // Configure ASP.NET authentication/authorization.
            elsa.UseDefaultAuthentication(auth => auth.UseAdminApiKey());

            // Expose Elsa API endpoints.
            elsa.UseWorkflowsApi();

            // Setup a SignalR hub for real-time updates from the server.
            elsa.UseRealTimeWorkflows();

            // Enable HTTP activities.
            elsa.UseHttp();

            // Register custom activities from the application, if any.
            elsa.AddActivitiesFrom<Program>();
        });

        // Configure CORS to allow designer app hosted on a different origin to invoke the APIs.
        builder.Services.AddCors(cors => cors
            .AddDefaultPolicy(policy => policy
                .AllowAnyOrigin() // For demo purposes only. Use a specific origin instead.
                .AllowAnyHeader()
                .AllowAnyMethod()
                .WithExposedHeaders("x-elsa-workflow-instance-id"))); // Required for Elsa Studio in order to support running workflows from the designer. Alternatively, you can use the `*` wildcard to expose all headers.

        // Add Health Checks.
        builder.Services.AddHealthChecks();

        var queue = new BlockingCollection<Guid>();

        builder.Services.AddSingleton(new Publish(queue.Add));
        builder.Services.AddSingleton(new Receive(queue.Take));
        builder.Services.AddHostedService<WorkflowService>();

        // Build the web application.
        var app = builder.Build();

        // Configure web application's middleware pipeline.
        app.UseCors();
        app.UseRouting(); // Required for SignalR.
        app.UseAuthentication();
        app.UseAuthorization();
        app.UseWorkflowsApi(); // Use Elsa API endpoints.
        app.UseWorkflows(); // Use Elsa middleware to handle HTTP requests mapped to HTTP Endpoint activities.
        app.UseWorkflowsSignalRHubs(); // Optional SignalR integration. Elsa Studio uses SignalR to receive real-time updates from the server. 

        app.Run();
    }

    public record Publish(Action<Guid> Action);

    public record Receive(Func<Guid> Func);

    public record BookmarkPayload(Guid guid);

    public sealed class MyActivity : Activity
    {
        protected override async ValueTask ExecuteAsync(
            ActivityExecutionContext context)
        {
            var publish = context.GetRequiredService<Publish>();
            var guid = Guid.NewGuid();
            var payload = new BookmarkPayload(guid);
            context.CreateBookmark(payload, ResumeAsync);
            publish.Action(guid);
            await Task.Run(() => { });
        }

        private async ValueTask ResumeAsync(
            ActivityExecutionContext context) => 
            await context.CompleteActivityAsync();
    }

    public sealed class WorkflowService(
        IServiceScopeFactory factory,
        Receive receive) : BackgroundService
    {
        protected override Task ExecuteAsync(
            CancellationToken stoppingToken)
        {
            var serviceProvider = factory.CreateScope().ServiceProvider;
            var bookmarkStore = serviceProvider.GetRequiredService<IBookmarkStore>();
            var workflowDispatcher = serviceProvider.GetRequiredService<IWorkflowDispatcher>();

            var result = new Task(() => Execute(
                receive.Func,
                bookmarkStore,
                workflowDispatcher,
                stoppingToken));
            result.Start();
            return result;
        }

        private static async void Execute(
            Func<Guid> receive,
            IBookmarkStore bookmarkStore,
            IWorkflowDispatcher workflowDispatcher,
            CancellationToken stoppingToken) 
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var guid = receive();
                // Thread.Sleep(2000);
                var allBookmarks = await bookmarkStore.FindManyAsync(
                    new BookmarkFilter(),
                    stoppingToken);
                var requests = allBookmarks
                    .Where(b => b.Payload is BookmarkPayload)
                    .Select(MakeDispatchWorkflowInstanceRequest);

                foreach (var request in requests)
                {
                    await workflowDispatcher.DispatchAsync(request, stoppingToken);
                }
            }
        }

        private static DispatchWorkflowInstanceRequest MakeDispatchWorkflowInstanceRequest(
            StoredBookmark bookmark) =>
            new DispatchWorkflowInstanceRequest(bookmark.WorkflowInstanceId)
            {
                BookmarkId = bookmark.BookmarkId,
            };
    }
}
  1. Attachments:

  2. Reproduction Rate: 100 %

  3. Video/Screenshots: see below

  4. Additional Configuration: none

Expected Behavior

A bookmark can be found as soon as it has been created.

Actual Behavior

Whether a bookmark is found is time-dependent (race condition).

Screenshots

2024-06-13 10_14_17-Suspended-Activity

Environment

Log Output

none

Troubleshooting Attempts

Thread.Sleep(2000);

Additional Context

none

Related Issues

none

glime-ai[bot] commented 3 weeks ago

Based on the provided context, it appears that there is a race condition between the creation of a bookmark and the retrieval of that bookmark by the workflow service. The Thread.Sleep(2000); is used as a workaround to ensure that the bookmark is created before the workflow service attempts to retrieve it.

To address this issue, you should ensure that the event publishing and bookmark creation are completed before the workflow service starts looking for bookmarks to resume. This could involve using synchronization mechanisms such as semaphores, mutexes, or other signaling constructs to coordinate between the activity that creates the bookmark and the service that processes the event and resumes the workflow.

Additionally, you may want to review the workflow's design to ensure that the event is not published before the bookmark is created, or to implement a retry mechanism in the workflow service that will keep checking for the bookmark for a certain period before giving up.

If the issue persists, you may need to delve deeper into the Elsa Workflow's internal mechanisms for creating and managing bookmarks, or reach out to the Elsa Workflow community for support.

Since the provided context does not include specific methods or properties to address this race condition directly, the above suggestions are based on general best practices for dealing with concurrency and timing issues in asynchronous systems.

This comment was generated by Glime.

frank-lusatia commented 3 weeks ago

The activity creates the bookmark before publishing the event. The activity asynchronously publishes the event after creating the bookmark. The service asynchronously receives the event published by the activity. The BlockingCollection used in the example is already a sufficient synchronization mechanism.

sfmskywalker commented 2 weeks ago

Your analysis of the issue is spot on. There are several ways to go about this:

Given that in typical scenarios, only one, or perhaps a handful, of activities create a bookmark, batching their persistence may be considered a premature optimization that is unnecessary and in fact can invite race conditions under certain circumstances as demonstrated here.