dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.61k stars 1.28k forks source link

Failed event may excute more than FailedRetryCount when EnableSubscriberParallelExecute is true #1559

Closed sampsonye closed 2 months ago

sampsonye commented 2 months ago

for performance,we set EnableSubscriberParallelExecute=trueUseStorageLock=true and limit FailedRetryCount = 5 and the app has 10 replias we found some failed event can offen execute more than 5 times (FailedRetryCount )

we check the source code, may be here have some problem: https://github.com/dotnetcore/CAP/blob/7b6b033873295f6ea04f5d5f0b0b4788d6146e5a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs#L175

 public async ValueTask EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor? descriptor = null)
 {
     try
     {
         if (_tasksCts!.IsCancellationRequested) return;

         if (_enableParallelExecute)
         {
             if (!_receivedChannel.Writer.TryWrite((message, descriptor))) //Here just push into channel, but do not execute, but the lock will release after it
             {
                 while (await _receivedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
                     if (_receivedChannel.Writer.TryWrite((message, descriptor)))
                         return;
             }
         }
         else
         {
             await _executor.ExecuteAsync(message, descriptor, _tasksCts!.Token).ConfigureAwait(false);
         }
     }
     catch (OperationCanceledException)
     {
         //Ignore
     }
     catch (Exception e)
     {
         _logger.LogError(e, "An exception occurred when invoke subscriber. MessageId:{MessageId}", message.DbId);
     }
 }

image

yang-xiaodong commented 2 months ago

Hi @sampsonye

Thanks for the report, this is a bug.

What do you think about to add condition to determine message.Retires==0 ?

Code like :

 if (_enableParallelExecute && message.Retires==0)
 {
     if (!_receivedChannel.Writer.TryWrite((message, descriptor))) 
     {
         while (await _receivedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
             if (_receivedChannel.Writer.TryWrite((message, descriptor)))
                 return;
     }
 } 
 else
 {
     await _executor.ExecuteAsync(message, descriptor, _tasksCts!.Token).ConfigureAwait(false);
 }
sampsonye commented 2 months ago

Hi @sampsonye

Thanks for the report, this is a bug.

What do you think about to add condition to determine message.Retires==0 ?

Code like :

 if (_enableParallelExecute && message.Retires==0)
 {
     if (!_receivedChannel.Writer.TryWrite((message, descriptor))) 
     {
         while (await _receivedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
             if (_receivedChannel.Writer.TryWrite((message, descriptor)))
                 return;
     }
 } 
 else
 {
     await _executor.ExecuteAsync(message, descriptor, _tasksCts!.Token).ConfigureAwait(false);
 }

this looks like can solve the problem.

but may cause performance degradation?

yang-xiaodong commented 2 months ago

but may cause performance degradation?

No, Would you want to make a PR?

sampsonye commented 2 months ago

done