FoundatioFx / Foundatio

Pluggable foundation blocks for building distributed apps.
Apache License 2.0
1.99k stars 244 forks source link

publishAsync immediately returns when awaited #269

Closed macco3k closed 2 years ago

macco3k commented 2 years ago

I have this setup where I am using the in-memory message bus to exchange messages between objects. At some point, one of them will send a message which triggers an expensive recomputation involving database operations. This is async and returns a task so it can be awaited. However, given the following code, the first call to publishAsync immedately returns, instead of (at least from what I have understood) (asynchronously) waiting the expensive operation to finish.

// trigger recalculation of cache for everything
var rootNodeFlaggedForAggregateData = await _treeStore.FlagDescendantMeterNodesDirty(rootNodeId);
if (rootNodeFlaggedForAggregateData)
{
    await _messageBus.PublishAsync(new TreeNodeFlaggedDirty { NodeId = rootNodeId });
    // meter data recalculation finished -> flag for costs recalculation
    var rootNodeFlaggedForCosts = await _treeStore.FlagDescendantMeterNodesDirty(rootNodeId, DirtyMeterFlag.CalculateCosts);
    if (rootNodeFlaggedForCosts)
    {
        await _messageBus.PublishAsync(new CostsCalculationHandler.TreeNodeTrigger { NodeId = rootNodeId });
    }
}

One thing about the expensive operation is that it will run for a bunch of objects, though such operations can run in parallel so they are not awaited one by one but instead run together and then awaited by Task.WhenAll:

foreach (var meterBatch in batches)
{
    var recalculationTasks = new List<Task>();
    foreach (var meter in meterBatch)
    {
        var meterFlaggedDirtyEvent = new MeterFlaggedDirty
        {
            MeterId = meter.Id,
            Now = eventData.Now
        };

        recalculationTasks.Add(_meterHandler.Received(meterFlaggedDirtyEvent));
    }

    await Task.WhenAll(recalculationTasks);
} 

Inside the Received method, no task is created, e.g. via Task.Run, so I would expect the whole thing to run asynchronously, but the second publishAsync call at the beginning not to be executed until the expensive operation is finished (since everything is awaited). Since this does not seem to be the case, am I missing something? I tried with a simple console application, delaying the handler for a few seconds and printing to the console, and the behaviour was as expected, with prints being generated in sequence.

Important note: this is running inside a GraphQL mutation triggered via an ASP.Net Core API. Perhaps this makes all the difference

ejsmith commented 2 years ago

I think you are looking for more of a request / response messaging pattern. We don't currently support that. When you publish the message it's sent out to all subscribers. It's a pub/sub model.

macco3k commented 2 years ago

Indeed that's what the code is expecting, though being familiar with the pub/sub model, the actual behaviour is exactly what I would have expected (the code was not written by me). However, looking at the code for the in-memory implementation it looks like everything is awaited all the way down, so I'm still wondering why is this not resuming execution only after all subscribers have run? As mentioned, I wrote a simple console app where the subscriber had a delay of a few seconds, and saw that being correctly recorded by the stopwatch. Do you care to elaborate a bit?

ejsmith commented 2 years ago

PublishAsync is only publishing the messages which normally would mean sending that message to some sort of messaging server. The message handlers are then run separately. So even with the in memory implementation, the handlers are called separately from the publish.

macco3k commented 2 years ago

Makes total sense, just wanted to check whether I was missing something. Thanks for clarifying!