MassTransit / Automatonymous

A state machine library for .Net - 100% code - No doodleware
Apache License 2.0
736 stars 117 forks source link

Activity Exception runs infinite loop in RabbitMQ #75

Closed chaduvula9 closed 3 years ago

chaduvula9 commented 3 years ago

Hi,

I created a sample Prescription workflow with a part of the statemachine implementation as below.

During(Created, Valid, When(ValidatePrescriptionEvent) .Activity(x => x.OfType()) .TransitionTo(Valid) .RespondAsync(context => context.Init(new { PrescriptionNumber = context.Instance.CorrelationId, CustomerNumber = context.Instance.CustomerNumber, State = context.Instance.CurrentState == Created.Name ? Valid.Name : context.Instance.CurrentState, Drugs = context.Instance.PresciptionDrugs }) ) .Catch(ex => ex.TransitionTo(Invalid) .RespondAsync(context => context.Init(new { PrescriptionNumber = context.Instance.CorrelationId, Message = context.Exception.Message }) ) ), When(SetValidPrescriptionEvent) .TransitionTo(Valid) );

I created an activity file to implement the ValidatePrescriptionActivity that implements Execute as below

public async Task Execute(BehaviorContext<PrescriptionStatus, ValidatePrescription> context, Behavior<PrescriptionStatus, ValidatePrescription> next) { if (DateTime.Now.Minute % 2 == 0) { await next.Execute(context); } **else { Console.WriteLine("Invalid Prescription");

            throw new InvalidOperationException();
        }**
    }

The intention is to throw an exception when the current min is Odd. The state changes to Invalid as intended, but there is an infinite loop that runs on the RabbitMQ and eventually failing Redis due to Concurrency.

Below are my error messages coming from the consumer console:

warn: MassTransit.ReceiveTransport[0] Retrying 00:00:01.7600000: Unable to lock saga: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba_lock) MassTransit.MassTransitException: Unable to lock saga: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba_lock) at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.SagaLock.<Lock>g__LockAsync|6_0() at MassTransit.Policies.PipeRetryExtensions.Retry[T](IRetryPolicy retryPolicy, Func1 retryMethod, CancellationToken cancellationToken) dbug: MassTransit.ReceiveTransport[0] SAGA:Prescription.Components.StateMachine.PrescriptionStatus:f6be58b0-13d0-4a80-a6aa-5562d6fa29ba Used Sample.Contracts.Interfaces.Drugs.InvalidPrescription dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/Sample.Contracts.Interfaces.Drugs:InvalidPrescription 00010000-4100-0250-a8f5-08d8fe86e5b4 Sample.Contracts.Interfaces.Drugs.InvalidPrescription dbug: MassTransit.ReceiveTransport[0] SAGA:Prescription.Components.StateMachine.PrescriptionStatus:f6be58b0-13d0-4a80-a6aa-5562d6fa29ba Used Sample.Contracts.Interfaces.Drugs.InvalidPrescription warn: MassTransit.ReceiveTransport[0] Retrying 00:00:01.1430000: Unable to lock saga: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba_lock) MassTransit.MassTransitException: Unable to lock saga: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba_lock) at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.SagaLock.<Lock>g__LockAsync|6_0() at MassTransit.Policies.PipeRetryExtensions.Retry[T](IRetryPolicy retryPolicy, Func1 retryMethod, CancellationToken cancellationToken) dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-56e6-08d8fe86e535 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-811a-08d8fe86e567 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-5746-08d8fe86e535 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-5396-08d8fe86e535 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-56f9-08d8fe86e535 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-8ec1-08d8fe86e538 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-f364-08d8fe86e5a9 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/MassTransit:Fault--Sample.Contracts.Interfaces.Drugs:InvalidPrescription-- 00010000-4100-0250-4b57-08d8fe86e5d5 MassTransit.Fault dbug: MassTransit.ReceiveTransport[0] SEND rabbitmq://localhost/Sample.Contracts.Interfaces.Drugs:InvalidPrescription 00010000-4100-0250-3262-08d8fe86e5d7 Sample.Contracts.Interfaces.Drugs.InvalidPrescription dbug: MassTransit.ReceiveTransport[0] SAGA:Prescription.Components.StateMachine.PrescriptionStatus:f6be58b0-13d0-4a80-a6aa-5562d6fa29ba Used Sample.Contracts.Interfaces.Drugs.InvalidPrescription fail: MassTransit.ReceiveTransport[0] R-FAULT rabbitmq://localhost/prescription-status 00010000-4100-0250-43d6-08d8fe86e4eb Sample.Contracts.Interfaces.Drugs.InvalidPrescription Prescription.Components.StateMachine.PrescriptionStatus(00:00:00.6500925) MassTransit.SagaException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga update failed ---> MassTransit.RedisIntegration.RedisSagaConcurrencyException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga version conflict at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) --- End of inner exception stack trace --- at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory1.<>c__DisplayClass6_01.<gCreateScope|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext1 context, IPipe1 next) **fail**: MassTransit.ReceiveTransport[0] R-FAULT rabbitmq://localhost/prescription-status 00010000-4100-0250-5e39-08d8fe86e557 Sample.Contracts.Interfaces.Drugs.InvalidPrescription Prescription.Components.StateMachine.PrescriptionStatus(00:00:00.7106839) MassTransit.SagaException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga update failed ---> MassTransit.RedisIntegration.RedisSagaConcurrencyException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga version conflict at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) --- End of inner exception stack trace --- at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory1.<>cDisplayClass6_01.<<Send>g__CreateScope|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter2.GreenPipes.IFilter<MassTransit.ConsumeContext>.Send(ConsumeContext1 context, IPipe1 next) fail: MassTransit.ReceiveTransport[0] R-FAULT rabbitmq://localhost/prescription-status 00010000-4100-0250-0c36-08d8fe86e3cf Sample.Contracts.Interfaces.Drugs.InvalidPrescription Prescription.Components.StateMachine.PrescriptionStatus(00:00:00.8804586) MassTransit.SagaException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga update failed ---> MassTransit.RedisIntegration.RedisSagaConcurrencyException: Prescription.Components.StateMachine.PrescriptionStatus(f6be58b0-13d0-4a80-a6aa-5562d6fa29ba) Saga exception: Saga version conflict at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) --- End of inner exception stack trace --- at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.RedisIntegration.Contexts.RedisDatabaseContext1.Update(SagaConsumeContext1 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.Saga.SendSagaPipe2.Send(SagaRepositoryContext2 context) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.RedisIntegration.Contexts.RedisSagaRepositoryContextFactory1.Send[T](ConsumeContext1 context, IPipe1 next) at MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionSagaRepositoryContextFactory1.<>c__DisplayClass6_01.<g__CreateScope|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext1 context, IPipe`1 next)

phatboyg commented 3 years ago

I hate to be annoying, but you need to create a discussion here. This isn't an issue with the framework, it's a question on usage.