rebus-org / Rebus.Async

:bus: Experimental async extensions for Rebus
https://mookid.dk/category/rebus
Other
13 stars 9 forks source link

Request-response does not work inside handler #3

Closed zodrog closed 7 years ago

zodrog commented 7 years ago

Hi,

When trying doing a request-response, and inside the handler you try to do a brand new Request-response (to another service), the request is sent, but it fails with a timeout saying cannot dispatched to any handlers on the bus.reply operation(on the last handler)..

Thanks

mookid8000 commented 7 years ago

(...) it fails with a timeout saying cannot dispatched to any handlers on the bus.reply operation(on the last handler)

I am not sure that I understand how it fails.... I would expect a timeout to say something along the lines of "a response message was not received within n seconds timeout" or something like that, but the text "cannot be dispatched to any handlers" sort of implies that a message was received in a place that was not capable of handling it.

Could you maybe include some logs including full stack traces? – that would help a lot.

zodrog commented 7 years ago

Hi,

Thanks for the quick reply. Congratulations BTW for Rebus. Amazing framework...

Indeed throws an exception regarding timeout, since the timeout occurs cause of the Task.Delay inside the implementation.

I forked the Rebus.Samples and I've modified the RequestReply example with Rebus.Async adding a new consumer, to better understand it, instead of posting logs...

https://github.com/zodraz/RebusSamples

Hope that helps solving the issue.

So many thanks.

mookid8000 commented 7 years ago

If you make these small changes, you can make it work:

My guess is that you did not change the endpoint mapping to consumer2.input when you initially introduced Consumer2.

You can see the full diff of what I did to make it work here:

diff --git a/RequestReply/Consumer2/Program.cs b/RequestReply/Consumer2/Program.cs
index 5065b22..6a43387 100644
--- a/RequestReply/Consumer2/Program.cs
+++ b/RequestReply/Consumer2/Program.cs
@@ -18,11 +18,11 @@ namespace Consumer
         {
             using (var adapter = new BuiltinHandlerActivator())
             {
-                adapter.Handle<Job2>(async (bus, job) =>
+                adapter.Handle<Job>(async (bus, job) =>
                 {
                     var keyChar = job.KeyChar;
                     var processId = Process.GetCurrentProcess().Id;
-                    var reply = new Reply2(keyChar, processId);
+                    var reply = new Reply(keyChar, processId);

                     await bus.Reply(reply);
                 });
diff --git a/RequestReply/Producer/Program.cs b/RequestReply/Producer/Program.cs
index 85a63eb..08dedb2 100644
--- a/RequestReply/Producer/Program.cs
+++ b/RequestReply/Producer/Program.cs
@@ -24,7 +24,7 @@ namespace Producer
                     .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
                     .Options(o => o.EnableSynchronousRequestReply(replyMaxAgeSeconds: 7))
                     .Transport(t => t.UseSqlServer("Data Source=VS2017-W2016;Initial Catalog=ActorMessages;Integrated Security=True;MultipleActiveResultSets=True", "Messages", "producer.input"))
-                    .Routing(r => r.TypeBased().Map<Job>("consumer.input"))
+                    .Routing(r => r.TypeBased().Map<Job>("consumer2.input"))^M
                     .Start();

                 Console.WriteLine("Press Q to quit , r for a request/response or any other key to produce a job");
@@ -39,6 +39,7 @@ namespace Producer

                         case 'r':
                             var reply = adapter.Bus.SendRequest<Reply>(new Job(keyChar)).Result;
+                            Console.WriteLine($"Got reply: {reply.KeyChar} from PID {reply.OsProcessId}");^M
                             break;

                         default:

Sorry about the slow reply btw – I wanted to give you a good reply, so I hope you think it was worth waiting for 😄 please let me know if you still cannot make it work.

zodrog commented 7 years ago

Hi,

Thanks for the response. We are all busy ;-)

I think I've not explained so well the scenario...What I want to achieve is a chained Request-Response scenario so:

Producer <---> Consumer1 <-----> Consumer2

So the Consumer1 handles the Job message and Consumer2 handles Job2, and their chains the replies.

Thanks

mookid8000 commented 7 years ago

ah, now I get it – I think you may have been bitten by the fact that await bus.Send(...) enlists its work in the ongoing handler transaction, thus not actually sending the outgoing message until the message being handled has been acknowledged.

IIRC this was changed in one of the more recent versions of Rebus.Async, so if you

Update-Package Rebus.Async

then I think it will resolve itself 😄

zodrog commented 7 years ago

Hi,

I have the latest version Rebus.Async 5.0.0... Not working...As far as my understanding a Rebus handler will queue a transaction surrounding the entire processing of the message (the SendRequest one), and then Rebus.Async calls this ambient transaction....Anyway I don't get why it is not acking...Should somehow propagated / inherited? Little confused with these tx...

public static async Task<TReply> SendRequest<TReply>(this IBus bus, object request, Dictionary<string, string> optionalHeaders = null, TimeSpan? timeout = null)
        {
            var currentTransactionContext = AmbientTransactionContext.Current;
            try
            {
                AmbientTransactionContext.SetCurrent(null);
                return await InnerSendRequest<TReply>(bus, request, optionalHeaders, timeout);
            }
            finally
            {
                AmbientTransactionContext.SetCurrent(currentTransactionContext);
            }
        }

Thanks

mookid8000 commented 6 years ago

As far as my understanding a Rebus handler will queue a transaction surrounding the entire processing of the message

That's correct. But the queue transaction is "ambient", stored in the current execution context via Rebus' AmbientTransactionContext.SetCurrent(...).

So the code you posted there simply shows how SendRequest avoids getting its call to InnerSendRequest executed with an ambient Rebus transaction context, ensuring that the context gets put back on the execution context when it has sent its request.

This test proves that it works.

Can you maybe see if everything is wired correctly together, as in: Are you sending the request to the correct queue, etc

zodrog commented 6 years ago

Hi, I think everything is wired properly or I am missing something...I modified my code to assert all the routings be the same on all processes (which should be not needed):

So on every Rebus configuration I have:

.Routing(r => r.TypeBased().Map<Job>("consumer.input").Map<Job2>("consumer2.input"))

And every process has its own queue:

Producer:
Transport(t => t.UseSqlServer(ConnectionString, "Messages", "producer.input")) Consumer1: .Transport(t => t.UseSqlServer(ConnectionString, "Messages", "consumer.input")) Consumer2: .Transport(t => t.UseSqlServer(ConnectionString, "Messages", "consumer2.input"))

And this is the log from Rebus: From Producer to Consumer1 => Rebus.Pipeline.Send.SendOutgoingMessageStep DEBUG (Thread #1): Sending Consumer.Messages.Job -> "consumer.input"

From Consumer1 to Consumer2: Rebus.Pipeline.Send.SendOutgoingMessageStep DEBUG (Thread #5): Sending Consumer.Messages.Job2 -> "consumer2.input" From producer 2: Rebus.Pipeline.Send.SendOutgoingMessageStep DEBUG (Thread #5): Sending Consumer.Messages.Reply2 -> "consumer.input" Rebus.Pipeline.Receive.DispatchIncomingMessageStep DEBUG (Thread #5): Dispatching message "Job2/request-reply:d4f9435f-daac-40bd-b5d6-067fe5814a23" to 1 handlers took 83 ms

And then the first Consumer fails saying it cannot handle Reply2... Message with ID 364d0e11-7bc3-4118-9f2a-b18ff761de76 and type Consumer.Messages.Reply2, Consumer.Messages could not be dispatched to any handlers

I've updated with these simple changes on my fork repository.

Thanks

zodrog commented 6 years ago

Hi Mogens,

Any update on this issue?

Thanks

mookid8000 commented 6 years ago

Ah, sorry – I completely forgot about it 😐

I finally had time to debug your code, and I found two issues – first, when you're in a method that is async, you should not call .Result on the Task (found here) because you can simply

var reply1 = await adapter.Bus.SendRequest<Reply2>(new Job2(keyChar));

You probably knew that already 😁

The other thing – and this is what turned out the be causing the error message – is the missing EnableSynchronousRequestReply in Consumer, meaning that the incoming message pipeline does not have the required step to intercept the incoming reply message and return it to the caller of SendRequest.

So I changed the configuration to

Configure.With(adapter)
    .Logging(l => l.ColoredConsole(minLevel: LogLevel.Debug))
    .Options(o =>
    {
        o.LogPipeline(verbose: true);
        o.EnableSynchronousRequestReply(replyMaxAgeSeconds: 7);
    })
    .Transport(t => t.UseMsmq("consumer.input"))
    .Routing(r => r.TypeBased().Map<Job>("consumer.input").Map<Job2>("consumer2.input"))
    .Start();

and then it worked 🙂

Sorry for taking so long to help you fix it.

zodrog commented 6 years ago

Thanks Mogens