JasperFx / wolverine

Supercharged .NET server side development!
https://wolverinefx.net
MIT License
1.24k stars 135 forks source link

Unprocessed envelopes in Durable Local Queues #909

Closed ivanoterrivel2 closed 4 months ago

ivanoterrivel2 commented 5 months ago

When multiple events of the same type are registered and sent via Outbox using SendAsync for a Tenant, and the DeliverWithinproperty is defined, only the first event is processed by the Handler. The remaining events remain in the database until the application is restarted. Upon restart, the application picks up these stored events and the Handler processes them accordingly.

To address this issue, you can use DurableInbox for the local queue of a specific event type. For example: opts.LocalQueueFor<StateUpdateEventDomain>().UseDurableInbox();

To Reproduce Steps to reproduce the behavior:

  1. Add Domain Event StateUpdateEventDomain to more than 1 instance of an aggregate
  2. Store update data via Marten
  3. Send the domain event for the aggregate
        foreach ( var domainEvent in aggregateModel.DomainEvents )
        {
            await _outbox.SendAsync(domainEvent, new DeliveryOptions { TenantId = _documentSession.GetBusinessUnit().ToString(), DeliverWithin = 30.Minutes() });
        }
  4. SaveChangesAsync via Marten
  5. See on the Handler that only the domain event for 1 instance of the aggregate is being processed, the others are stored on the DB and not processed

Expected behavior It is expected that all of the events are processed within the 30 minutes.

Work Around I have found that if I define, all of the events of that same time for different aggregates are handled correctly, but I don't really know the repercussions of using this in Dev or Prod environments.

opts.Durability.Mode = DurabilityMode.Solo;

Wolverine version: 2.9.0

jeremydmiller commented 5 months ago

@ivanoterrivel2 So opts.Durability.Mode = DurabilityMode.Solo; isn't suitable for production like I hope the documentation makes very clear. It is a good choice for development time or testing time

Hey, could you add just a little more reproduction code? I don't entirely get what you're doing or where.

Not what you're asking about here, but I'll also recommend against the aggregate pattern it looks like you're using and recommend the "Decider" pattern usage in the aggregate workflow handler

ivanoterrivel2 commented 5 months ago

Yes, and to give additional context. I have an Aggregate in which it's state can be updated. Everytime the ChangeState is called on an aggregate I want to register an event, which will be processed by an Handler. What this Handler does it to update an ElasticSearch document with new data, based on the fact that the Aggregate changed State. Since I don't want to couple Aggregate logic on the Domain Model, with ElasticSearch data I decided to use the Wolverine handlers.

For the Aggregate:

public void ChangeStateTo(ExternalPurchasePriceState newState)
{
    State = newState;
    AggregateUpdated();
}

private void AggregateUpdated()
{
    AddDomainEvent(new PurchasePriceUpsertExternalEventDomain(Id));
    AddDomainEvent(new PurchasePriceExternalStateUpdateEventDomain(Id));
}

For the Outbox implementation:

    public async Task StoreToDbAndSendDomainEventToOutbox<TAggregateRoot>(TAggregateRoot aggregateModell) where TAggregateRoot : AggregateRoot<Guid>
    {
        _documentSession.Store<TAggregateRoot>(aggregateModel);

        foreach (IDomainEvent<Guid> domainEvent in aggregateModel.DomainEvents)
        {
            await _outbox.SendAsync(domainEvent, new DeliveryOptions
            {
                TenantId = _documentSession.GetBusinessUnit().ToString(),
                DeliverWithin = 30.Minutes()
            });
        }

        aggregateModel.ClearEvents();
        await _documentSession.SaveChangesAsync();
    }

The configuration:

    private static void RegisterWolverineOptions(WolverineOptions opts)
    {
        //opts.Durability.Mode = DurabilityMode.Solo;
        //opts.Durability.DurabilityAgentEnabled = true;
        opts.LocalQueueFor<PurchasePriceExternalStateUpdateEventDomain>().UseDurableInbox();
        opts.Discovery.IncludeAssembly(typeof(SupplierProductCreateHandler).Assembly);
    }

The Handler:

[WolverineHandler]
public class PurchasePriceExternalDocumentUpdateOnStateChangeHandler
{
    public static async Task Handle(PurchasePriceExternalStateUpdateEventDomain externalEvent,
        IPurchasePriceExternalRepository purchasePriceExternalRepo,
        IExternalPurchasePriceIntegrationSearchService searchClient,
        ISupplierProductsRelatedDataAggregator supplierProductsRelatedDataAggregator,
        ILogger<PurchasePriceExternalDocumentUpdateOnStateChangeHandler> logger)
    {

    // Implementation details

    }

The scenario: Call 4 times the ChangeStateTo function for 4 different aggregates (once per aggregate), and verify that without the opts.Durability.Mode = DurabilityMode.Solo;

on the configuration it only calls the Handler once. If the line is uncommented it processes all of the 4 events correctly.

If you need any more details let me know. Thank you.

jeremydmiller commented 4 months ago

@ivanoterrivel2 Sorry, this isn't clear what it is you're doing here. What's the outbox variable up above? In what context are you using this? Are you doing this through your own code, or in a Wolverine handler? How is the session scoped? Are you using the document session across different aggregate models? That could easily be the cause, I'm not sure that's supported.

Honestly, this looks very non-idiomatic Wolverine usage so far. There's just not anything that's actionable yet,

Yeah, confirmed this. If you're trying to use the same "outbox-d" Marten session and commit twice, it won't kick out the messages after the first call.

My advice would be to delegate to Wolverine as a mediator for each aggregate instance.

ivanoterrivel2 commented 4 months ago

Hello again, your last comment was extremely helpful, the outbox Marten session won't kick out the messages after the first call, so I changed the implementation of the method to receive the multiple aggregates I want to send the events for, and send them on the same instance of the oubox, something like this:

            var domainEvents = aggregateModelsAndHeaders.SelectMany(t => t.aggregate.DomainEvents);
            foreach (var domainEvent in domainEvents)
            {
                await _outbox.SendAsync(domainEvent, new DeliveryOptions { TenantId = _documentSession.GetBusinessUnit().ToString(), DeliverWithin = 30.Minutes() });
            }

            Parallel.ForEach(aggregateModelsAndHeaders, aggregateModelAndHeader =>
            {
                aggregateModelAndHeader.aggregate.ClearEvents();
            });

Thanks a lot for your work