rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.31k stars 359 forks source link

prevent concurrently processing events with the same type/identifier #1080

Closed Meyce closed 1 year ago

Meyce commented 1 year ago

I've run into an issue with concurrently processing duplicate messages that I'm looking for a solution to.

What is happening is I am receiving Platform Events from Salesforce via a GRCP client and putting them onto ASB with Rebus in one process, and then multiple service consume the events with Rebus/ASB. What is happening is that under the right circumstances Salesforce can publish multiple messages of the same type/identifier within milliseconds of each other. This is leading to concurrency issues downstream when Rebus is handling messages in parallel. I'm looking for a way to "serialize" the processing of events on a type/identifier basis. I didn't see anything referenced in the wiki or in other github issues that is built in the Rebus that would help me accomplish this.

I think I could potentially handle this with an incoming message step. The step would need to do a couple of things. First check if the message type/id is in a store. If it isn't add the message type/id to the store. And allow the message to continue to be processed. Once the processing is done the step would remove the message type/id from the store. All read/write access to the store should be synchronized.

If the message step finds the messasge type/id in the store, then another action would need to be taken. I'm thinking for our case, deferring the message, back the ASB for a short interval would be best. We get multiple messages in very short intervals but these messages are not necessarily duplicates so all of them should go through the handler.

The approach above is the best I could come up with using Rebus's extensibility points. Any thoughts on what I have?

Thanks

GMacAussie commented 1 year ago

G'day CM,

Given you have control of the GRPC client placing messages in ASB, and therefore have control over the ASB MessageId, and given that the identifier you mentioned is the same, is there any reason you cannot use ASB's native message deduplication?

OTOH, if it is resource contention by different message identifiers, then we do use a receive pipe for concurrency - but before going into that the ASB deduplication is the easiest (note however that the queue needs to be created/recreated with deduplication on).

Cheers, Graham

Meyce commented 1 year ago

@GMacAussie, The messages aren't necessarily duplicates. I stated that in my original post. Even with two events coming out of SF for the same aggregate I could get the same event type/aggregate id that are have different attribute values. Something to do with how SF publishes platform events that I don't have any control over.

In your second paragraph you mention of receive pipeline for contention. Can you point me at any docs or the code related to that?

GMacAussie commented 1 year ago

G'day Meyce,

Ah sorry I read multiple messages of the same type/identifier as being duplicates.

So if the message ids are not duped then yes it gets into resource contention, at least for us. Our contention is against database records, but YMMV. In essence we do locking where the message has enough information to identify how it should be gated:

public interface ILockingHandler {
    /// <summary>
    /// retrieve the name of the resource to apply the database lock.
    /// </summary>
    /// <returns>The database lock resource</returns>
    string GetResource();
}
return new PipelineStepInjector(pipeline).OnReceive(new LockingHandlerIncomingStep(), PipelineRelativePosition.Before, typeof(LoadSagaDataStep));
[StepDocumentation("Incoming step that wraps the pipeline in database locking scope.")]
public class LockingHandlerIncomingStep : IIncomingStep {
    /// <summary>
    /// NLog logger for class.
    /// </summary>
    private static Logger logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// Process an incoming message by checking if the message requires a database application lock.
    /// </summary>
    /// <param name="context">The incoming step context from Rebus</param>
    /// <param name="next">The next incoming step for Rebus</param>
    /// <returns></returns>
    public async Task Process(IncomingStepContext context, Func<Task> next) {
        ILockingHandler iLocking = context.Load<Message>().Body as ILockingHandler;

        // check if we have the requirement for a distributed lock
        if (iLocking != null) {
            IDistributedLock distLock = IoC.container.Resolve<IDistributedLockProvider>().CreateLock(iLocking.GetResource());

            // wrap the pipeline processing in a distributed lock
            using (await distLock.AcquireAsync().ConfigureAwait(false)) {
                // next step in pipeline
                await next();
            }
        } else {
            // next step in pipeline
            await next();
        }
    }
}

Here is the distributed lock package:

https://github.com/madelson/DistributedLock

Let me know if you have questions.

Cheers, Graham

mookid8000 commented 1 year ago

@Meyce @GMacAussie 's suggestion is pretty much what I would have done myself! And the DistributesLock package is just great 🙂