danielgerlag / workflow-core

Lightweight workflow engine for .NET Standard
MIT License
5.33k stars 1.19k forks source link

Workflow slow when the count of the execution point more and more #1028

Open qazq opened 2 years ago

qazq commented 2 years ago

Hi,

I use workflow-core to control the manufacturing process. One product might spend 1-2 months so that the number of execution points goes to almost 3000. In this case, executing one step will take a lot of time (1-2 seconds). I use sample.10 to reproduce this issue.

    public class WhileWorkflow : IWorkflow<MyData>
    {
        public string Id => "While";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3000)  // increase to 3000 times
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();
        }
    }

add debug message

EntityFrameworkPersistenceProvider

public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
{
    long t1 = 0, t2 = 0, t3 = 0, t4 = 0;
    var watch = Stopwatch.StartNew();

    using (var db = ConstructDbContext())
    {
        t1 = watch.ElapsedMilliseconds;
        watch.Restart();

        var uid = new Guid(workflow.Id);
        var existingEntity = await db.Set<PersistedWorkflow>()
            .Where(x => x.InstanceId == uid)
            .Include(wf => wf.ExecutionPointers)
            .ThenInclude(ep => ep.ExtensionAttributes)
            .Include(wf => wf.ExecutionPointers)
            .AsTracking()
            .FirstAsync(cancellationToken);
        t2 = watch.ElapsedMilliseconds;
        watch.Restart();

        var persistable = workflow.ToPersistable(existingEntity);
        t3 = watch.ElapsedMilliseconds;
        watch.Restart();

        await db.SaveChangesAsync(cancellationToken);
        t4 = watch.ElapsedMilliseconds;
        watch.Restart();
    }

    Console.WriteLine($"\n\n  PersistWorkflow >>> p1={t1}, p2={t2}, p3={t3}, p4={t4}\n\n");
}

WorkflowConsumer

        protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
        {
            if (!await _lockProvider.AcquireLock(itemId, cancellationToken))
            {
                Logger.LogInformation("Workflow locked {0}", itemId);
                return;
            }

            WorkflowInstance workflow = null;
            WorkflowExecutorResult result = null;

            long t1, t2, t3, t4, t5;
            t1 = t2 = t3 = t4 = t5 = 0;
            var watch = Stopwatch.StartNew();

            Logger.LogDebug("ProcessItem >>> step.1");
            try
            {
                cancellationToken.ThrowIfCancellationRequested();
                workflow = await _persistenceStore.GetWorkflowInstance(itemId, cancellationToken);
                t1 = watch.ElapsedMilliseconds;
                watch.Restart();
                if (workflow.Status == WorkflowStatus.Runnable)
                {
                    try
                    {
                        result = await _executor.Execute(workflow, cancellationToken);
                        t2 = watch.ElapsedMilliseconds;
                        watch.Restart();
                    }
                    finally
                    {
                        await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
                        t3 = watch.ElapsedMilliseconds;
                        watch.Restart();
                        await QueueProvider.QueueWork(itemId, QueueType.Index);
                        t4 = watch.ElapsedMilliseconds;
                        watch.Restart();
                        _greylist.Remove($"wf:{itemId}");
                    }
                }
            }
            finally
            {
                await _lockProvider.ReleaseLock(itemId);
                if ((workflow != null) && (result != null))
                {
                    foreach (var sub in result.Subscriptions)
                    {
                        await SubscribeEvent(sub, _persistenceStore, cancellationToken);
                    }

                    await _persistenceStore.PersistErrors(result.Errors, cancellationToken);

                    if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
                    {
                        var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
                        if (workflow.NextExecution.Value < readAheadTicks)
                        {
                            new Task(() => FutureQueue(workflow, cancellationToken)).Start();
                        }
                        else
                        {
                            if (_persistenceStore.SupportsScheduledCommands)
                            {
                                await _persistenceStore.ScheduleCommand(new ScheduledCommand()
                                {
                                    CommandName = ScheduledCommand.ProcessWorkflow,
                                    Data = workflow.Id,
                                    ExecuteTime = workflow.NextExecution.Value
                                });
                            }
                        }
                    }
                }
                t5 = watch.ElapsedMilliseconds;
                watch.Restart();
            }

            Logger.LogDebug($"ProcessItem >>> t1={t1}, t2={t2}, t3={t3}, t4={t4}, t5={t5}");
        }
[11:02:34.106] info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
dbug: WorkflowCore.Services.BackgroundTasks.WorkflowConsumer[0]
      ProcessItem >>> step.1
info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (14ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (15ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
[11:02:34.310] dbug: WorkflowCore.Services.WorkflowExecutor[0]
      Starting step (null) on workflow c3ec1e32-c549-490b-81d2-94f2250781bf
info: Microsoft.EntityFrameworkCore.Infrastructure[10403]
      Entity Framework Core 5.0.1 initialized 'SqlServerContext' using provider 'Microsoft.EntityFrameworkCore.SqlServer' with options: NoTracking
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (12ms) [Parameters=[@__uid_0='?' (DbType = Guid)], CommandType='Text', CommandTimeout='30']
      SELECT [t].[PersistenceId], [t].[CompleteTime], [t].[CreateTime], [t].[Data], [t].[Description], [t].[InstanceId], [t].[NextExecution], [t].[Reference], [t].[Status], [t].[Version], [t].[WorkflowDefinitionId], [t0].[PersistenceId], [t0].[Active], [t0].[Children], [t0].[ContextItem], [t0].[EndTime], [t0].[EventData], [t0].[EventKey], [t0].[EventName], [t0].[EventPublished], [t0].[Id], [t0].[Outcome], [t0].[PersistenceData], [t0].[PredecessorId], [t0].[RetryCount], [t0].[Scope], [t0].[SleepUntil], [t0].[StartTime], [t0].[Status], [t0].[StepId], [t0].[StepName], [t0].[WorkflowId], [t0].[PersistenceId0], [t0].[AttributeKey], [t0].[AttributeValue], [t0].[ExecutionPointerId]
      FROM (
          SELECT TOP(1) [w].[PersistenceId], [w].[CompleteTime], [w].[CreateTime], [w].[Data], [w].[Description], [w].[InstanceId], [w].[NextExecution], [w].[Reference], [w].[Status], [w].[Version], [w].[WorkflowDefinitionId]
          FROM [wfc].[Workflow] AS [w]
          WHERE [w].[InstanceId] = @__uid_0
      ) AS [t]
      LEFT JOIN (
          SELECT [e].[PersistenceId], [e].[Active], [e].[Children], [e].[ContextItem], [e].[EndTime], [e].[EventData], [e].[EventKey], [e].[EventName], [e].[EventPublished], [e].[Id], [e].[Outcome], [e].[PersistenceData], [e].[PredecessorId], [e].[RetryCount], [e].[Scope], [e].[SleepUntil], [e].[StartTime], [e].[Status], [e].[StepId], [e].[StepName], [e].[WorkflowId], [e0].[PersistenceId] AS [PersistenceId0], [e0].[AttributeKey], [e0].[AttributeValue], [e0].[ExecutionPointerId]
          FROM [wfc].[ExecutionPointer] AS [e]
          LEFT JOIN [wfc].[ExtensionAttribute] AS [e0] ON [e].[PersistenceId] = [e0].[ExecutionPointerId]
      ) AS [t0] ON [t].[PersistenceId] = [t0].[WorkflowId]
      ORDER BY [t].[PersistenceId], [t0].[PersistenceId], [t0].[PersistenceId0]
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (10ms) [Parameters=[@p2='?' (DbType = Int64), @p0='?' (Size = -1), @p1='?' (Size = 4000)], CommandType='Text', CommandTimeout='30']
      SET NOCOUNT ON;
      UPDATE [wfc].[ExecutionPointer] SET [Children] = @p0, [PersistenceData] = @p1
      WHERE [PersistenceId] = @p2;
      SELECT @@ROWCOUNT;
info: Microsoft.EntityFrameworkCore.Database.Command[20101]
      Executed DbCommand (5ms) [Parameters=[@p0='?' (DbType = Boolean), @p1='?' (Size = 4000), @p2='?' (Size = 4000), @p3='?' (DbType = DateTime2), @p4='?' (Size = 4000), @p5='?' (Size = 100), @p6='?' (Size = 100), @p7='?' (DbType = Boolean), @p8='?' (Size = 50), @p9='?' (Size = 4000), @p10='?' (Size = 4000), @p11='?' (Size = 100), @p12='?' (DbType = Int32), @p13='?' (Size = 4000), @p14='?' (DbType = DateTime2), @p15='?' (DbType = DateTime2), @p16='?' (DbType = Int32), @p17='?' (DbType = Int32), @p18='?' (Size = 100), @p19='?' (DbType = Int64)], CommandType='Text', CommandTimeout='30']
      SET NOCOUNT ON;
      INSERT INTO [wfc].[ExecutionPointer] ([Active], [Children], [ContextItem], [EndTime], [EventData], [EventKey], [EventName], [EventPublished], [Id], [Outcome], [PersistenceData], [PredecessorId], [RetryCount], [Scope], [SleepUntil], [StartTime], [Status], [StepId], [StepName], [WorkflowId])
      VALUES (@p0, @p1, @p2, @p3, @p4, @p5, @p6, @p7, @p8, @p9, @p10, @p11, @p12, @p13, @p14, @p15, @p16, @p17, @p18, @p19);
      SELECT [PersistenceId]
      FROM [wfc].[ExecutionPointer]
      WHERE @@ROWCOUNT = 1 AND [PersistenceId] = scope_identity();

  PersistWorkflow >>> p1=0, p2=123, p3=21, p4=41

dbug: WorkflowCore.Services.BackgroundTasks.WorkflowConsumer[0]
      ProcessItem >>> t1=219, t2=1, t3=186, t4=0, t5=0

ProcessItem t1 is time to get the workflow instance from the database. It spend a lot of time because there are so many execution points.

ProcessItem t3 is time to persist the workflow. The detailed time span is shown in the PersistWorkflow p1~p4. p2 is time to query workflow from the database, p3 is ToPersistable(), p4 is SaveChangesAsync()

There are 1595 execution points in the database, and the number of children is large (string length is 29489).

image

Maybe we can optimize

Thanks!

dthemg commented 2 years ago

Hi! Me and my team are experiencing similar issues, where the execution becomes slower and slower as the number of executionPointers increase. In my application each evaluation, shown on the x-axis, adds another 10 or so execution pointers: image

As you can see, the application steadily slows down as the number of execution pointers increase.

dthemg commented 2 years ago

Update: I changed my persistence provder to remove "Cancelled" and "Completed" steps. After doing this, continuing the test above showed better behavior: image

Changes:

 public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
        {
           // Start of change
            var cancelledPointers = workflow.ExecutionPointers.FindByStatus(PointerStatus.Cancelled);
            var completedPointers = workflow.ExecutionPointers.FindByStatus(PointerStatus.Complete);
            foreach (var pointer in cancelledPointers)
                workflow.ExecutionPointers.Remove(pointer);
            foreach (var pointer in completedPointers)
                workflow.ExecutionPointers.Remove(pointer);
            // End of change

            await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
        }
vladimir-kovalyuk commented 2 years ago

@dthemg This issue is well known. I commented in a couple of earlier discussions. Retrieving active EPs only is a way around the problem with iterative loops. But what about parallel loops?

LingDian2019 commented 4 months ago

I have also encountered this problem, the jump between nodes is getting slower and slower. After restarting the program, it works. How can I solve the problem?

Stress test the same process as follows

There are three nodes in the process, the second of which pauses for 3 seconds and then throws an exception

public class ConveyorBeginJob : WorkflowStepBody
{
    public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
    {
        PassingData.ExecutionStartTime = DateTime.Now;
        return await Task.FromResult(ExecutionResult.Next());
    }
}
public class ThrowExceptionJobV1 : WorkflowStepBody
{
     public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
     {
         try
         {
             await Task.Delay(TimeSpan.FromSeconds(3));
             throw new Exception("My Exception");
         }
         catch (Exception)
         {
             throw;
         }

         return await Task.FromResult(ExecutionResult.Next());
     }
}
public class ConveyorEndJob : WorkflowStepBody
{
     public override async Task<ExecutionResult> RunAsync(IStepExecutionContext context)
     {
         PassingData.ExecutionEndTime = DateTime.Now;
         return await Task.FromResult(ExecutionResult.Next());
     }
}
public class WorkflowErrorCommonHandler : IWorkflowErrorHandler
{
    public WorkflowErrorHandling Type => WorkflowErrorHandling.Terminate;

    public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception exception, Queue<ExecutionPointer> bubbleUpQueue)
    {
        try
        {
            var workflowStepPassingData = (WorkflowStepPassingData)workflow.Data;
            workflowStepPassingData.EndTime = DateTime.Now;

            foreach (var stepItem in workflow.ExecutionPointers)
            {
                if (!string.IsNullOrWhiteSpace(stepItem.StepName) && stepItem.StartTime != null)
                {
                    long stepTotalMilliseconds = -1;
                    if (stepItem.EndTime != null)
                    {
                        stepTotalMilliseconds = stepItem.EndTime.ToTotalMilliseconds(stepItem.StartTime);
                    }

                    var jobElapsedTimeName = $"({stepItem.StepName})-({stepItem.Status})-({stepItem.StartTime?.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff")})-({stepItem.EndTime?.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss.fff")})";
                    workflowStepPassingData.JobElapsedTime.TryAdd(jobElapsedTimeName, stepTotalMilliseconds);
                }
            }

          //Logging workflow

        }
        catch (Exception ex)
        {
            _log4NetService.WriteLog(LogNameKey.SystemError, ex);
        }
    }
}

At 500 concurrency, process execution takes a long time 1、The conclusion is that the node jump takes a long time 2、It takes a long time to enter the IWorkflowErrorHandler

{
  "id": "7199437576333133140",
  "createTime": "2024-05-23 15:54:37.279",
  "executionStartTime": "2024-05-23 15:54:44.796",
  "executionEndTime": null,
  "endTime": "2024-05-23 15:55:36.984",
  "jobElapsedTime": {
    "(ConveyorBeginJob)-(Complete)-(2024-05-23 15:54:44.796)-(2024-05-23 15:54:44.796)": "0",
    "(ThrowExceptionJobV1)-(Failed)-(2024-05-23 15:54:47.174)-()": "-1"
  },
  "condition": {},
  "data": {}
}
{
  "id": "7199459854026519216",
  "equipmentCode": "1112",
  "createTime": "2024-05-23 17:23:08.695",
  "executionStartTime": "2024-05-23 17:23:41.791",
  "executionEndTime": null,
  "endTime": "2024-05-23 17:26:25.098",
  "jobElapsedTime": {
    "(ConveyorBeginJob)-(Complete)-(2024-05-23 17:23:41.791)-(2024-05-23 17:23:41.791)": "0",
    "(ThrowExceptionJobV1)-(Failed)-(2024-05-23 17:24:37.686)-()": "-1"
  },
  "condition": {},
  "data": {}
}
q913777031 commented 4 months ago

I don't have persistence needs, can I use in-memory mode to clear terminated state workflows to prevent memory leaks and improve performance