pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
746 stars 144 forks source link

Passing a cancellation token for subscription handlers, possible? #407

Open lmarrero opened 5 years ago

lmarrero commented 5 years ago

Hello. Hopefully this is just a simple question.

As a quick background, the canonical example for receiving messages is:

await client.SubscribeAsync<BasicMessage>(async msg =>
{
  Console.WriteLine($"Received: {msg.Prop}.");
});

Now consider a handler where a potentially long, async operation is executed. It would be nice to pass a cancellation token to such operation. Something like this:

await client.SubscribeAsync<BasicMessage>(async (msg, cancellationToken) =>
{
  await LongOperationAsync(msg.Prop, cancellationToken);
});

Is this currently possible with some enricher? Or, do I need custom middleware? I checked the default HandlerInvocationMiddleware (relevant call pasted below) and it does not pass the cancellation token to the handler.

protected virtual async Task InvokeMessageHandler(IPipeContext context, CancellationToken token)
{
    var args = HandlerArgsFunc(context);
    var handler = MessageHandlerFunc(context);
    var acknowledgement = await handler(args);
    context.Properties.TryAdd(PipeKey.MessageAcknowledgement, acknowledgement);
    PostInvokeAction?.Invoke(context, acknowledgement);
}
rpawlaszek commented 4 years ago

You can pass it directly in the method:

using (var cts = new CancellationTokenSource())
{
    var token = cts.Token;
    await client.SubscribeAsync<BasicMessage>(async msg =>
    {
      await LongOperationAsync(msg.Prop, token);
    }, null, token);
};

Because if you'd expect it to be provided via (msg, cancellationToken) then what would be the source of the token? The one (I suppose) you would like to obtain would come from your code anyways.