serverlessworkflow / synapse

Serverless Workflow Management System (WFMS)
https://serverlessworkflow.io
Apache License 2.0
231 stars 35 forks source link

CloudEventBus circuit breaker stuck in a loop : The circuit is now open and is not allowing calls. #75

Closed JBBianchi closed 5 months ago

JBBianchi commented 2 years ago

What happened: The following error keeps appearing in the server logs:

System.Net.Http.HttpClient.CloudEventBus.LogicalHandler: Information: Start processing HTTP request POST https://en37uhd2he6t4.x.pipedream.net/
System.Net.Http.HttpClient.CloudEventBus.ClientHandler: Information: Sending HTTP request POST https://en37uhd2he6t4.x.pipedream.net/
System.Net.Http.HttpClient.CloudEventBus.ClientHandler: Information: Received HTTP response headers after 722.1693ms - 413
Neuroglia.Eventing.Services.CloudEventBus: Error: An error occured while posting a cloud events to the broker: Polly.CircuitBreaker.BrokenCircuitException`1[System.Net.Http.HttpResponseMessage]: The circuit is now open and is not allowing calls.
   at Polly.CircuitBreaker.CircuitStateController`1.OnActionPreExecute()
   at Polly.CircuitBreaker.AsyncCircuitBreakerEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext, ExceptionPredicates shouldHandleExceptionPredicates, ResultPredicates`1 shouldHandleResultPredicates, ICircuitController`1 breakerController)
   at Polly.AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext)
   at Polly.Wrap.AsyncPolicyWrapEngine.<>c__DisplayClass0_0`1.<<ImplementationAsync>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func`3 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates`1 shouldRetryResultPredicates, Func`5 onRetryAsync, Int32 permittedRetryCount, IEnumerable`1 sleepDurationsEnumerable, Func`4 sleepDurationProvider, Boolean continueOnCapturedContext)
   at Polly.AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext)
   at Polly.Wrap.AsyncPolicyWrapEngine.ImplementationAsync[TResult](Func`3 func, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext, IAsyncPolicy`1 outerPolicy, IAsyncPolicy`1 innerPolicy)
   at Polly.AsyncPolicy`1.ExecuteAsync(Func`3 action, Context context, CancellationToken cancellationToken, Boolean continueOnCapturedContext)
   at Microsoft.Extensions.Http.PolicyHttpMessageHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
   at Microsoft.Extensions.Http.Logging.LoggingScopeHttpMessageHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
   at System.Net.Http.HttpClient.<SendAsync>g__Core|83_0(HttpRequestMessage request, HttpCompletionOption completionOption, CancellationTokenSource cts, Boolean disposeCts, CancellationTokenSource pendingRequestsCts, CancellationToken originalCancellationToken)
   at Neuroglia.Eventing.Services.CloudEventBus.<>c__DisplayClass31_0.<<DequeueAndPublishPendingEventsAsync>b__1>d.MoveNext()

(because the output payload of a workflow/state/action was too big???)

What you expected to happen: No error, or, at best, a limited numbers of retries?

How to reproduce it: Not sure, try:

Environment: Win 10 x64 - self hosted

cdavernas commented 2 years ago

The error is unhappily due to a payload size restriction on pipedream. What do you propose we do to avoid this? Dead letter queue? Proceed on specific failures? Or just break down the pipeline? I could not come up with a solution that satisfied me more than waiting to reestablish the connection with the intended broker, resilient to restarts, too.

JBBianchi commented 2 years ago

Not sure, I guess the dead letter queue might be an option. Anything that isn't getting stuck in a loop forever is better that the current behavior I think.

cdavernas commented 2 years ago

I tend to disagree. My main focus was to ensure reliable and ordered delivery of outbound cloud events. I personnally don't see the harm in being stuck in the loop (it's what it's intended for), even though the queue might quickly become abolustely huge in case of delivery failures.

@tsurdilo @antmendoza Do you guys have any idea regarding that issue?

antmendoza commented 2 years ago

no idea @cdavernas sorry. but the error is weird, that big is the payload?

I have not been able to see the error in my environment, it is failing before I think,

[05/12/2022 10:10:59] info: Microsoft.AspNetCore.Routing.EndpointMiddleware[1]
      Executed endpoint 'gRPC - /Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi/Fault'
[05/12/2022 10:10:59] info: Microsoft.AspNetCore.Hosting.Diagnostics[2]
      Request finished HTTP/2 POST http://synapse:41387/Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi/Fault application/grpc - - 200 - application/grpc 40.9727ms
[05/12/2022 10:10:59] fail: Grpc.AspNetCore.Server.ServerCallHandler[6]
      Error when executing service method 'Connect'.
      System.OperationCanceledException: The operation was canceled.
         at System.Threading.Channels.AsyncOperation`1.GetResult(Int16 token)
         at System.Threading.Channels.ChannelReader`1.ReadAllAsync(CancellationToken cancellationToken)+MoveNext()
         at System.Threading.Channels.ChannelReader`1.ReadAllAsync(CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi.Connect(String runtimeId, CallContext context)+MoveNext() in /src/src/apis/runtime/Synapse.Apis.Runtime.Grpc/Services/SynapseGrpcRuntimeApi.cs:line 75
         at Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi.Connect(String runtimeId, CallContext context)+MoveNext() in /src/src/apis/runtime/Synapse.Apis.Runtime.Grpc/Services/SynapseGrpcRuntimeApi.cs:line 75
         at Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi.Connect(String runtimeId, CallContext context)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
         at ProtoBuf.Grpc.Internal.Reshape.WriteTo[T](IAsyncEnumerable`1 reader, IServerStreamWriter`1 writer, CancellationToken cancellationToken) in /_/src/protobuf-net.Grpc/Internal/Reshape.cs:line 134
         at ProtoBuf.Grpc.Internal.Reshape.WriteTo[T](IAsyncEnumerable`1 reader, IServerStreamWriter`1 writer, CancellationToken cancellationToken) in /_/src/protobuf-net.Grpc/Internal/Reshape.cs:line 134
         at Grpc.Shared.Server.ServerStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, TRequest request, IServerStreamWriter`1 streamWriter)
         at Grpc.Shared.Server.ServerStreamingServerMethodInvoker`3.Invoke(HttpContext httpContext, ServerCallContext serverCallContext, TRequest request, IServerStreamWriter`1 streamWriter)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
         at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerCallHandlerBase`3.<HandleCallAsync>g__AwaitHandleCall|8_0(HttpContextServerCallContext serverCallContext, Method`2 method, Task handleCall)
[05/12/2022 10:10:59] info: Microsoft.AspNetCore.Routing.EndpointMiddleware[1]
      Executed endpoint 'gRPC - /Synapse.Apis.Runtime.Grpc.SynapseGrpcRuntimeApi/Connect'
[05/12/2022 10:10:59] info: Microsoft.AspNetCore.Hosting.Diagnostics[2]

do I have to provide any input to the workflow?

antmendoza commented 2 years ago

sorry, I was running the previous version of synapse, now the error is different, pasting here just in case you find it useful.


[05/12/2022 10:25:27] info: Microsoft.AspNetCore.Routing.EndpointMiddleware[1]
      Executed endpoint 'Synapse.Apis.Management.Http.Controllers.V1WorkflowInstancesController.Start (Synapse.Apis.Management.Http)'
[05/12/2022 10:25:27] fail: Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware[1]
      An unhandled exception has occurred while executing the request.
      System.Threading.Tasks.TaskCanceledException: A task was canceled.
         at Docker.DotNet.Models.StreamUtil.MonitorStreamForMessagesAsync[T](Task`1 streamTask, DockerClient client, CancellationToken cancellationToken, IProgress`1 progress)
         at Docker.DotNet.Models.StreamUtil.MonitorResponseForMessagesAsync[T](Task`1 responseTask, DockerClient client, CancellationToken cancel, IProgress`1 progress)
         at Synapse.Runtime.Services.DockerRuntimeHost.PullRuntimeExecutorImageAsync(CancellationToken cancellationToken) in /src/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntimeHost.cs:line 160
         at Synapse.Runtime.Services.DockerRuntimeHost.StartAsync(V1WorkflowInstance workflowInstance, CancellationToken cancellationToken) in /src/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntimeHost.cs:line 104
         at Synapse.Application.Commands.WorkflowInstances.V1StartWorkflowInstanceCommandHandler.HandleAsync(V1StartWorkflowInstanceCommand command, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Commands/WorkflowInstances/v1/V1StartWorkflowInstanceCommand.cs:line 92
         at Neuroglia.Mediation.FluentValidationMiddleware`2.HandleAsync(TRequest request, RequestHandlerDelegate`1 next, CancellationToken cancellationToken)
         at Neuroglia.Mediation.DomainExceptionHandlingMiddleware`2.HandleAsync(TRequest request, RequestHandlerDelegate`1 next, CancellationToken cancellationToken)
         at Neuroglia.Mediation.RequestPipeline`2.HandleAsync(IRequest`1 request, IServiceProvider serviceProvider, CancellationToken cancellationToken)
         at Neuroglia.Mediation.Mediator.ExecuteAsync[TResult](IRequest`1 request, CancellationToken cancellationToken)
         at Synapse.Apis.Management.Http.Controllers.V1WorkflowInstancesController.Start(String id, CancellationToken cancellationToken) in /src/src/apis/management/Synapse.Apis.Management.Http/Controllers/V1WorkflowInstancesController.cs:line 95
         at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|20_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
         at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
         at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)
         at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)
         at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
         at Microsoft.AspNetCore.Authentication.AuthenticationMiddleware.Invoke(HttpContext context)
         at Microsoft.AspNetCore.OData.Routing.ODataRouteDebugMiddleware.Invoke(HttpContext context)
         at Neuroglia.Eventing.CloudEventMiddleware.InvokeAsync(HttpContext context, CloudEventFormatter formatter, ISubject`1 stream)
         at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.<Invoke>g__Awaited|6_0(ExceptionHandlerMiddleware middleware, HttpContext context, Task task)
[05/12/2022 10:25:27] info: Microsoft.AspNetCore.Routing.EndpointMiddleware[0]
      Executing endpoint 'Fallback {*path:nonfile}'
[05/12/2022 10:25:27] info: Microsoft.AspNetCore.Routing.EndpointMiddleware[1]
      Executed endpoint 'Fallback {*path:nonfile}'
[05/12/2022 10:25:27] fail: Microsoft.AspNetCore.Server.Kestrel[13]
      Connection id "0HMHK03SGS870", Request id "0HMHK03SGS870:00000008": An unhandled exception was thrown by the application.
      System.InvalidOperationException: The exception handler configured on ExceptionHandlerOptions produced a 404 status response. This InvalidOperationException containing the original exception was thrown since this is often due to a misconfigured ExceptionHandlingPath. If the exception handler is expected to return 404 status responses then set AllowStatusCode404Response to true.
       ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
         at Docker.DotNet.Models.StreamUtil.MonitorStreamForMessagesAsync[T](Task`1 streamTask, DockerClient client, CancellationToken cancellationToken, IProgress`1 progress)
         at Docker.DotNet.Models.StreamUtil.MonitorResponseForMessagesAsync[T](Task`1 responseTask, DockerClient client, CancellationToken cancel, IProgress`1 progress)
         at Synapse.Runtime.Services.DockerRuntimeHost.PullRuntimeExecutorImageAsync(CancellationToken cancellationToken) in /src/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntimeHost.cs:line 160
         at Synapse.Runtime.Services.DockerRuntimeHost.StartAsync(V1WorkflowInstance workflowInstance, CancellationToken cancellationToken) in /src/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntimeHost.cs:line 104
         at Synapse.Application.Commands.WorkflowInstances.V1StartWorkflowInstanceCommandHandler.HandleAsync(V1StartWorkflowInstanceCommand command, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Commands/WorkflowInstances/v1/V1StartWorkflowInstanceCommand.cs:line 92
         at Neuroglia.Mediation.FluentValidationMiddleware`2.HandleAsync(TRequest request, RequestHandlerDelegate`1 next, CancellationToken cancellationToken)
         at Neuroglia.Mediation.DomainExceptionHandlingMiddleware`2.HandleAsync(TRequest request, RequestHandlerDelegate`1 next, CancellationToken cancellationToken)
         at Neuroglia.Mediation.RequestPipeline`2.HandleAsync(IRequest`1 request, IServiceProvider serviceProvider, CancellationToken cancellationToken)
         at Neuroglia.Mediation.Mediator.ExecuteAsync[TResult](IRequest`1 request, CancellationToken cancellationToken)
         at Synapse.Apis.Management.Http.Controllers.V1WorkflowInstancesController.Start(String id, CancellationToken cancellationToken) in /src/src/apis/management/Synapse.Apis.Management.Http/Controllers/V1WorkflowInstancesController.cs:line 95
         at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|20_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
         at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
         at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
         at Swashbuckle.AspNetCore.SwaggerUI.SwaggerUIMiddleware.Invoke(HttpContext httpContext)
         at Swashbuckle.AspNetCore.Swagger.SwaggerMiddleware.Invoke(HttpContext httpContext, ISwaggerProvider swaggerProvider)
         at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
         at Microsoft.AspNetCore.Authentication.AuthenticationMiddleware.Invoke(HttpContext context)
         at Microsoft.AspNetCore.OData.Routing.ODataRouteDebugMiddleware.Invoke(HttpContext context)
         at Neuroglia.Eventing.CloudEventMiddleware.InvokeAsync(HttpContext context, CloudEventFormatter formatter, ISubject`1 stream)
         at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.<Invoke>g__Awaited|6_0(ExceptionHandlerMiddleware middleware, HttpContext context, Task task)
         --- End of inner exception stack trace ---
         at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.HandleException(HttpContext context, ExceptionDispatchInfo edi)
         at Microsoft.AspNetCore.Diagnostics.ExceptionHandlerMiddleware.<Invoke>g__Awaited|6_0(ExceptionHandlerMiddleware middleware, HttpContext context, Task task)
         at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)
[05/12/2022 10:25:27] info: Microsoft.AspNetCore.Hosting.Diagnostics[2]
      Request finished HTTP/1.1 PUT http://localhost:42286/api/v1/workflow-instances/byid/order-dog-apxh5qe1x02ury3lpafmiw/start - 0 - 0 - - 300816.5198ms
JBBianchi commented 2 years ago

@antmendoza you might need to create the user first on the petShop swagger demo api (https://petstore.swagger.io/#/user/createUser) and use a payload similar to this one to start an instance:

{
    "username": "test",
    "password": "test",
    "quantityToOrder": 1
}
cdavernas commented 2 years ago

@antmendoza In addition, I see that whenever a worker exits, even successfully, it does not seem to gracefully shutdown the GRPC connection, thus resulting in the first set of errors. The second set, though, is due to a bad routing configuration for the error endpoint.

cdavernas commented 2 years ago

no idea @cdavernas sorry. but the error is weird, that big is the payload?

Yes, in some case, like when taking the unfiltered result of Swagger's Petstore findPetByStatus action, the payload can (but really shouldn't) be considerable. And, as a free service, pipedream is of course rate and payload limiting access to their API.

The question, here, is: do we want to enforce proper and ordered delivery of cloud events to Synapse's consumers, or do we just park failed messages into some kind of external, dead-letter queue?

I personally prefer first option as the dead-letter queue will anyway grow continuously if/when/while facing delivery failures.

cdavernas commented 5 months ago

Closed as fixed in #366