arcus-azure / arcus.messaging

Messaging with Microsoft Azure in a breeze.
https://messaging.arcus-azure.net
MIT License
20 stars 11 forks source link

Provide 'aggregation' extension points for MessagePump #434

Open fgheysels opened 8 months ago

fgheysels commented 8 months ago

Is your feature request related to a problem? Please describe.

It is not extraordinary that a service which processes messages writes some (custom) metrics to a log-sink (for instance Application Insights). We can currently do that by calling the LogCustomMetric method on the ILogger. However, due to the design of Arcus.Messaging, this means that we need to call LogCustomMetric for every message that has been processed separately. In fact, this means that the value of the metric will most probably always be 1. (For instance, we processed 1 message of type X with such and such dimensions).

However, logging metrics this way, is not recommended. See here

Describe the solution you'd like

I think it would be nice if the message-pump could offer some kind of extension point where we can hook in and use that extension point for doing additional tasks (like loggin metrics).

On every invocation, the message-pump receives an x number of messages from the underlying messaging system. Every message will be processed separately. It would be nice if some kind of method can be executed after all the messages of this iteration have been processed. This method would need to pass in some information: for instance the amount of messages that were retrieved and processed (succesfully / not succesfully) during this iteration.

Next to that, it would be even nicer if we could pass in some custom 'state' in that method. State that is collected in a MessageHandler. For example, suppose we have a messagehandler that processes certain types of messages. Inside that message-handler, I'd like to keep track of some custom things:

 public async Task ProcessMessageAsync(
            OrderMessage message,
            AzureServiceBusMessageContext messageContext,
            MessageCorrelationInfo correlationInfo,
            CancellationToken cancellationToken)
{
      // Do some processing
     var customState = messageContext.State as Dictionary<string,object>;

     if ( processingSucceeded )
     {
          var customMetricData = customState["SucceededCounter"] as MyCustomStateObject;
          customMetricData.Counter += 1;
          customMetricData.Dimensions = new { ["ProcessingResult"] = "Succeeded", ["MessageType"] = "MyCustomMessage" } ;
     }
     else
     {
          var customMetricData = customState["FailedCounter"] as MyCustomStateObject;
          customMetricData.Counter += 1;
          customMetricData.Dimensions = new { ["ProcessingResult"] = "Succeeded", ["MessageType"] = "MyCustomMessage" } ;
     }
 }

Then, in that 'aggregation extension' point or hook, we could do this:

public async Task  AfterMessagePumpIteration( object customState )
{
      var customMetricData = customState as  Dictionary<string,object>;

      foreach( var kvp in customMetricData )
      {
           _logger.LogCustomMetric("OrdersProcessed", kvp.Counter, kvp.Dimensions);
      }
}

Theoretically, the above code would make it possible to log aggregated custom metrics with custom dimensions.

stijnmoreels commented 8 months ago

Couldn't this be fixed already with singleton registration in the dependency container, where the same instance is injected in the message handler(s)? The message pump works with a reactive receival pattern, so there is no batching; hence, we don't know when to start or stop counting for messages. Maybe with a singleton registration, you can have some state for a certain message type/filter, but that would be outside the scope of the messaging library, I believe. 🤔

fgheysels commented 8 months ago

I think it would be good if we could just discuss this in a separate 30 min meeting.

stijnmoreels commented 7 months ago

I think it would be good if we could just discuss this in a separate 30 min meeting.

Yes, we could do that. 👍