rebus-org / Rebus.ServiceProvider

:bus: Microsoft Extensions Dependency Injection container adapter for Rebus
https://mookid.dk/category/rebus
Other
65 stars 32 forks source link

How can I map request header value to correlation id in rebus #74

Closed KennethJakobsen closed 1 year ago

KennethJakobsen commented 1 year ago

In a Controller I do the following

Send a message on ASB (OneWayClient) to another service. Get an object with an httpClient.

this is tracked in application insights.

I have created a decorator that adds the ID to rbs2-corr-id header

the problem is that rebus manages its own scope in the service provider thus making it impossible for me to get the id in the scope of the controller.

Is there any auto magic I can use to make rebus and controller share the same scope?

mookid8000 commented 1 year ago

It's not clear to me what you're trying to achieve, sorry πŸ˜… could you maybe give me an example with some pseudo code or something like that?

KennethJakobsen commented 1 year ago

I can see why - maybe I was in too much of a hurry when I wrote the above πŸ˜„

What I want is to correlate events that happens before (and after) Rebus send a message because I have situations where rebus is not my entry point.

example LegacyCustomerController.cs

private readonly CustomerApi _customerApiClient;
private readonly IBus _bus;

public LegacyCustomerController(CustomerApi customerApiClient, IBus bus){
    _customerApiClient = customerApiClient;
   _bus = bus;
}
public async Task<IActionResponse> CreateCustomer(CustomerRequest customerReq)
{
     _bus.Send(new CreateCustomerCommand().FromRequest(customerReq)) <--- I want this guy to have the correlation Id that is passed down from the request, but I can't figure out how.
    return await LegacyAwaiter<CustomerModel>.WaitForSuccessResponse(
            async () => await _customerApiClient.GetCustomerByIdAsync(id.ToString())); <--- This guy already has the correlation ID from the controller
}

I'm trying to inject this id into the pipeline of Rebus so my developers doesn't have to care about correlation. I'm using the service provider scope to manage this.

I've basically built an operationId that has a scoped lifetime.

This is passed automatically to the apiClient, but since rebus creates it's own scope a new Id is created.

Does it make more sense now?

mookid8000 commented 1 year ago

First off, please remember to await when you call Send on IBus:

await  _bus.Send(new CreateCustomerCommand().FromRequest(customerReq));

Otherwise, weird stuff might happenπŸ˜… (and yes, sorry for not calling it SendAsync – this might be fixed in Rebus 8)

There's many ways you can enrich Rebus messages with stuff from other places. It's still not entirely clear to me how you'd prefer to pass the correlation ID, but here's a way that you might be able to use for inspiration: Snatch correlation ID from the current HTTP context!

First, create an outgoing pipeline step that does the actual magic:

class SetCorrelationIdOutgoingStep : IOutgoingStep
{
    readonly IServiceProvider _serviceProvider;

    public SetCorrelationIdOutgoingStep(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    public async Task Process(OutgoingStepContext context, Func<Task> next)
    {
        var httpContextAccessor = _serviceProvider.GetRequiredService<IHttpContextAccessor>();
        var httpContext = httpContextAccessor.HttpContext;
        var correlationId = httpContext?.Request.Headers["CorrelationId"].FirstOrDefault();

        if (correlationId != null)
        {
            var message = context.Load<Message>();

            message.Headers[Headers.CorrelationId] = correlationId;
        }

        await next();
    }
}

We'll install this step so that Rebus calls it every time it sends a message. To do this in a smooth way, we can create a configuration extension like this:

static class CorrelationIdConfigurationExtensions
{
    public static void AutomaticallyAddCorrelationId(this OptionsConfigurer configurer, IServiceProvider serviceProvider)
    {
        configurer.Decorate<IPipeline>(c =>
        {
            var pipeline = c.Get<IPipeline>();
            var step = new SetCorrelationIdOutgoingStep(serviceProvider);

            return new PipelineStepInjector(pipeline)
                .OnSend(step, PipelineRelativePosition.Before, typeof(SerializeOutgoingMessageStep));
        });
    }
}

which will then allow us to configure Rebus like this:

services.AddRebus(
    (configure, provider) => configure
        .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "doesn't matter"))
        .Options(o => o.AutomaticallyAddCorrelationId(provider))
);

Does that look like something you could use?

mookid8000 commented 1 year ago

Btw I committed the example code here: https://github.com/rebus-org/Rebus.ServiceProvider/blob/master/Rebus.ServiceProvider.Tests/Examples/SnatchCorrelationIdFromHttpRequest.cs

KennethJakobsen commented 1 year ago

Thank you I will try this approach I already did the outgoing steps, but I was missing the HttpContext BTW the code I posted was Pseudo code I just forgot the await.

mookid8000 commented 1 year ago

(..) I just forgot the await. (..)

πŸ˜