rebus-org / Rebus.Async

:bus: Experimental async extensions for Rebus
https://mookid.dk/category/rebus
Other
13 stars 9 forks source link

Best way to soak up late messages in Async Request/Response #14

Closed ben-jacobs closed 9 months ago

ben-jacobs commented 4 years ago

Hi Mogens,

I am using async request/reply in a poor-mans distributed rate limiting scenario to control outbound requests to services that have rate limits - essentially I have Polly wrapping the request to a "master" rate limiter (that replies to requests from multiple queues in different services) handling any Timeout exception with a retry/backoff strategy. This part works great.

However I get warning in the log for any late messages (i.e. where the response didn't arrive within the timeout window). If I do not have a handler for these late messages on the underlying bus I'll get the usual exceptions about not being able to find a handler. Currently I'm soaking up the messages with a no-op handler on the bus to avoid the exception, but of course I still see the warning in the logs.

I'm assuming the best way to deal with this would be to inject a step in the pipeline, but I'm not quite sure where/which step etc. is the best place to deal with this scenario. Alternatively is there either an option (can't see it in the source) to not send replies to the queue to be handled as per normal messages, or some other approach to deal with this scenario?

Thanks as always for Rebus (and Topper!).

Cheers,

Ben

mookid8000 commented 3 years ago

I'm not sure I fully understand... you want to use Rebus.Async to do inline request/reply, but you do not care about the replies?

If you don't care about the reply, how about simply using Rebus' built-in fire-and-forget style send:

await bus.Send(command);

?

ben-jacobs commented 3 years ago

I care about the replies.... Up to a time out period :)

Get Outlook for Androidhttps://aka.ms/ghei36


From: Mogens Heller Grabe notifications@github.com Sent: Tuesday, October 6, 2020 6:29:41 AM To: rebus-org/Rebus Rebus@noreply.github.com Cc: Ben Jacobs ben.jacobs@flinders.edu.au; Author author@noreply.github.com Subject: Re: [rebus-org/Rebus] Best way to soak up late messages in Async Request/Response (#902)

I'm not sure I fully understand... you want to use Rebus.Asynchttps://github.com/rebus-org/Rebus.Async to do inline request/reply, but you do not care about the replies?

If you don't care about the reply, how about simply using Rebus' built-in fire-and-forget style send:

await bus.Send(command);

?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHubhttps://github.com/rebus-org/Rebus/issues/902#issuecomment-703856087, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AENPI6Z5AF4BDTTFN3REKBLSJIQS3ANCNFSM4SBD54GQ.

mookid8000 commented 3 years ago

Hmm, I'm actually thinking that the best way would probably be to extend Rebus.Async with an option to specify whether to care about late replies.

Something like

var bus = Configure.With(_activator)
    .(...)
    .Options(o => o.EnableSynchronousRequestReply(
        replyMaxAgeSeconds: 7, 
        lateReplyHandling: LateReplyHandlingOption.Ignore
    ))
    .(...);

where LateReplyHandlingOption could be

public enum LateReplyHandlingOption
{
    LogAsDebug,
    LogAsInformation,
    LogAsWarning,
    LogAsError,
    Ignore
}

Do you think that would work for you?

ben-jacobs commented 3 years ago

That’d absolutely work – but given that it’s still “experimental”, I can understand any reluctance to mess with things too much 😊

In short, it’s not causing any “errors” – I can just sink the messages into a no-op handler but it still results in logs getting polluted with warnings - I do like your suggestion to enable setting the logging level as a quick simple solution (I think).

Cheers,

Ben

From: Mogens Heller Grabe notifications@github.com Sent: Tuesday, 6 October 2020 4:59 PM To: rebus-org/Rebus Rebus@noreply.github.com Cc: Ben Jacobs ben.jacobs@flinders.edu.au; Author author@noreply.github.com Subject: Re: [rebus-org/Rebus] Best way to soak up late messages in Async Request/Response (#902)

Hmm, I'm actually thinking that the best way would probably be to extend Rebus.Async with an option to specify whether to care about late replies.

Something like

var bus = Configure.With(_activator)

    .(...)

    .Options(o => o.EnableSynchronousRequestReply(

            replyMaxAgeSeconds: 7,

           lateReplyHandling: LateReplyHandlingOption.Ignore

    ))

    .(...);

where LateReplyHandlingOption could be

public enum LateReplyHandlingOption

{

    LogAsDebug,

    LogAsInformation,

    LogAsWarning,

    LogAsError,

    Ignore

}

Do you think that would work for you?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHubhttps://github.com/rebus-org/Rebus/issues/902#issuecomment-704059613, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AENPI65NJOZCTGJUFEOEVKDSJK2J7ANCNFSM4SBD54GQ.

mookid8000 commented 3 years ago

(...) given that it’s still “experimental”, I can understand any reluctance to mess with things too much (..)

Well, OTOH being "experimental" means that we can still play around and try things out. 🙂

I'll just move this issue to the Rebus.Async repo, and then we'll see if someone comes by and feels like submitting a little PR 😉

AhmedElbatt commented 9 months ago

@mookid8000 How about if I want to get the request/reply approch regarding a saga implementation like the following process sketch. I tried to achieve it but I receive a timout exception

System.AggregateException: One or more errors occurred. (Did not receive reply for request with in-reply-to ID 'request-reply_9bbaa02c-b77a-4a45-8dcb-92df944b8c4d' within 00:00:05 timeout)

please I need your help :)

Screenshot 1000

mookid8000 commented 9 months ago

Did you try increasing the timeout?

Did the receiver get the request?

AhmedElbatt commented 9 months ago

@mookid8000

Did you try increasing the timeout?

Did the receiver get the request?

The receiver got the reply; increasing the timeout doesn't help. The execution waiting until the period that has been given and then fires the same error

    internal class UpdateCustomerCommandHandler : IRequestHandler<UpdateCustomerCommand, Result<UpdatedCustomerDto>>
    {
        private readonly IBus _bus;
        private readonly ICustomerRepository _customerRepository;
        private readonly IMapper _mapper;
        //private readonly ISagaResult _sagaResult;
        public UpdateCustomerCommandHandler(IBus bus, ICustomerRepository customerRepository, IMapper mapper)
        {
            _bus = bus;
            _customerRepository = customerRepository;
            _mapper = mapper;
        }
        public async Task<Result<UpdatedCustomerDto>> Handle(UpdateCustomerCommand request, CancellationToken cancellationToken)
        {
            var customerToUpdate = await _customerRepository.GetByVexcashUserId(request.CustomerToUpdate.VexcashUserId);
            if (customerToUpdate == null)
                return new Result<UpdatedCustomerDto>($"customer {request.CustomerToUpdate.VexcashUserId} doesn't exist", HttpStatusCode.NotFound);

            var result = _bus.SendRequest<Result<bool>>(new TestRequest(_bus, cancellationToken));
CustomerToUpdateReceived(_mapper.Map<GoCardlessCustomer>(customerToUpdate)));

            var updatedCustomer = await _customerRepository.GetByVexcashUserId(request.CustomerToUpdate.VexcashUserId);
            var updatedCustomerDto = _mapper.Map<UpdatedCustomerDto>(updatedCustomer);
            return new Result<UpdatedCustomerDto>(updatedCustomerDto, $"Customer {updatedCustomerDto.VexcashUserId} has been Updated", HttpStatusCode.OK);
        }

        class TestRequest
        {
            private readonly IBus _bus;
            public TestRequest(IBus bus, CancellationToken cancellationToken)
            {
                _bus = bus;
                _bus.Reply(new TestReply());

            }
        }
        class TestReply { }
    }

In program.cs file I added the confirguration like that


            builder.Services.AddRebus(
                        rebus => rebus
                            .Routing(r =>
                                r.TypeBased().MapAssemblyOf<ApplicationAssembly>("MainQueue"))
                            .Transport(t =>
                                t.UseSqlServer(sqlServerTransportOptions, "MainQueue"))
                            .Options(x => { 
                                //x.RetryStrategy("ErrorQueue");
                                x.EnableSynchronousRequestReply();
                            })
                            .Sagas(s =>
                                s.StoreInSqlServer(
                                    sqlConnection,
                                    dataTableName: "Sagas",
                                    indexTableName: "SagaIndexes")));
mookid8000 commented 9 months ago

I can see a few things that look a little bit odd.

First off, your request gets IBus injected – that will NOT work, because the serializer will try to serialize the bus! (...and I have NO idea how that will work out... but I'm pretty sure it will not result in anything that makes sense!)

Next, your request seems to be trying to reply in its own constructor. That will not work, because the code will run as part of constructing the TestRequest instance and will end up sending a reply back to the sender of UpdateCustomerCommand (which I assume is a Rebus message).

Moreover, you're not awaiting the call to SendRequest – since you're in an async method, it should most likely read

var request = new TestRequest(_bus, cancellationToken));
var result = await _bus.SendRequest<Result<bool>>(request);
(...)

So it seems like a few things have been mixed a little bit together here 🙂

Your request should be an ordinary (serializable!) message type, e.g. something like

public record GetCustomerDetailsRequest(int CustomerId);

and the corresponding reply will then carry whatever payload it makes sense to receive as a reply – let's say it's just a name and a birth year:

public record GetCustomerDetailsReply(int CustomerId, string FullName, int BirthYear);

Whoever sends the request should then do it like this:

var request = new GetCustomerDetailsRequest(CustomerId: 123);
var reply = await bus.SendRequest<GetCustomerDetailsReply>(request);

// now we have the data
var name = reply.FullName;

The handler that receives the request should then looks somewhat like this (using C# 12 primary ctor 😉 ):

public class GetCustomerDetailsRequestHandler(IBus bus, IQueryable<Customer> customers) 
    : IHandleMessages<GetCustomerDetailsRequest>
{
    public async Task Handle(GetCustomerDetailsRequest request)
    {
        var customer = await customers.FindAsync(request.CustomerId) 
            ?? throw new ArgumentException($"Could not find customer with ID {request.CustomerId}");

        await bus.Reply(new GetCustomerDetailsReply(
            CustomerId: customer.Id, 
            FullName: $"{customer.FirstName} {customer.LastName}",
            BirthYear: customer.BirthDate.Year
        ));
    }
}

Does that make sense?

And one very important thing to note about this is that if the Rebus instance that handles your request is the same as the one that's sending it, THERE'S A PRETTY HIGH RISK OF A DEADLOCK (which you'll see as a timeout!), because if all of the parallelism happens to be used by logical threads that have just sent a request, then there's no parallelism left to handle the requests and deliver a reply... so you should strongly consider handling the requests in another Rebus instance.

I hope this helps 🙂

AhmedElbatt commented 9 months ago

@mookid8000

Thank you very much for trying to help me. I followed your guidelines this morning trying to imitate your solution. But unfortunately, I received the same timeout error.

using MediatR;
using Rebus;
using Rebus.Bus;
using Rebus.Handlers;
using System;

namespace Customers.Commands.UpdateCustomer
{
    public record GetCustomerDetailsRequest(int CustomerId);
    public record GetCustomerDetailsReply(int CustomerId, string FullName, int BirthYear);

    public class UpdateCustomerCommandHandler : IRequestHandler<UpdateCustomerCommand>
    {
        private readonly IBus _bus;
        private readonly ICustomerRepository _customerRepository;

        public UpdateCustomerCommandHandler(IBus bus, ICustomerRepository customerRepository)
        {
            _bus = bus;
            _customerRepository = customerRepository;
        }

        public async Task Handle(UpdateCustomerCommand request, CancellationToken cancellationToken)
        {
            var request1 = new GetCustomerDetailsRequest(CustomerId: 123);
            var reply = await _bus.SendRequest<GetCustomerDetailsReply>(request1);
            var name = reply.FullName;
        }
    }

    public class GetCustomerDetailsRequestHandler : IHandleMessages<GetCustomerDetailsRequest>
    {
        private readonly IBus _bus;
        public GetCustomerDetailsRequestHandler(IBus bus)
        {
            _bus = bus;
        }
        public async Task Handle(GetCustomerDetailsRequest message)
        {
            await _bus.Reply(new GetCustomerDetailsReply(100, "Test User", 1986));
        }
    }
}
mookid8000 commented 9 months ago

Did the request get handled by GetCustomerDetailsRequestHandler?

And, if so: WHEN did it get handled? I.e. if it got handled after the timeout had occurred, then it's a sign that it was deadlocked.

AhmedElbatt commented 9 months ago

@mookid8000

Did the request get handled by GetCustomerDetailsRequestHandler? Yes, the request get handled by the GetCustomerDetailsRequestHandler. And, if so: WHEN did it get handled? I.e. if it got handled after the timeout had occurred, then it's a sign that it was deadlocked. After that I see a log message that bus failed to dispatch the message couldn't be dispatced by any handler like the following screenshot error 1 And in the response I receive this message error2 I tried to increase the duration of the timeout but unfortunately, I couldn't reach to any positive response.

One more important thing, I am trying to achieve the approch regarding Saga implementation; that handle messages and in case if the message failed (i.e: in process 2) I want to return a given reply back to mediatR Request handler Screenshot 1000

mookid8000 commented 9 months ago

The error message

Message with ID and type could not be dispatched to any handlers (...)

means that the Rebus instance that received the message does not have a message handler for it.

EITHER the message was sent to the wrong queue, OR the Rebus instance has a configuration error (probably just a missing handler registration).

Could you check that the request gets sent to the right queue and that the Rebus instance that runs there has a handler for it?

AhmedElbatt commented 9 months ago

The error message

Message with ID and type could not be dispatched to any handlers (...)

means that the Rebus instance that received the message does not have a message handler for it.

EITHER the message was sent to the wrong queue, OR the Rebus instance has a configuration error (probably just a missing handler registration).

Could you check that the request gets sent to the right queue and that the Rebus instance that runs there has a handler for it?

That's very good question; the answer is no. The idea I want to apply (And please update me if you have other better ones) I wanto to reply from the final state I reach to in the saga stack. I used a unified reply with a flage that gives me the ability to identify if the process completed till the end or not in the mediatR handler. Based on this flage; I will return different responses for the endpoint.

Kindly have a look on this drawing that illustrates the architecture I want to achieve and please if there is other better idea, I would be so grateful if you could share it with me :) . Screenshot 2023-12-21 161001

mookid8000 commented 9 months ago

I'm sorry, but I simply don't understand what you're trying to do. 🤔

Could you maybe try and remove all the domain-specific terms and explain it in the most simple way you can? I'll be happy to help you, but I just get more confused from looking at your diagram.