Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
401 stars 143 forks source link

Incoming messages only processed when sending messages #490

Closed petertiedemann closed 2 years ago

petertiedemann commented 2 years ago

I am new to AMQP and amqpnetlite, but have been struggling with some fairly basic functionality. I am trying to implement a basic pub/sub API using AMQP, and I keep having problems with message only being received as part of sending a message, either immediately or after a short delay.

I have boiled it down to a very small example, with a dummy "chat" app, that I have made available here: https://github.com/petertiedemann/amqpnetlite-repro

The repo includes detailed repro steps, and ready-to-run code, but overall workflow is this:

  var factory = new ConnectionFactory();

  if ( options.Trace ) {
    Trace.TraceLevel = TraceLevel.Frame;
    Trace.TraceListener = ( _, format, objects ) => Console.WriteLine( Format( format, objects ) );
  }

  var connection = await factory.CreateAsync( new Address( options.Uri ) );

  var session = new Session( connection );

  var receiver = new ReceiverLink(
    session,
    "receiver",
    options.Key );

  var sender = new SenderLink( session, "sender", options.Key );

  void OnMessage( IReceiverLink link, Message message ) {
    Console.WriteLine( $"{DateTime.Now.ToLongTimeString()} > INCOMING:{message.GetBody<string>()}" );
    link.Accept( message );
  }

  receiver.Start( 100, OnMessage );

  while ( true ) {
    Console.WriteLine( "Enter a message, 'exit' to quit" );
    var message = Console.ReadLine();
    if ( IsNullOrWhiteSpace( message ) ) {
      Console.Error.WriteLine( "Try again!" );
      continue;
    }

    if ( message == "exit" ) {
      break;
    }

    await sender.SendAsync( new Message( message ), TimeSpan.FromSeconds( 5 ) );
    Console.WriteLine( $"{DateTime.Now.ToLongTimeString()} > SENT: {message}" );

The behavior I see depends a bit on the broker, but the general pattern is that at some point the client stops receiving incoming messages, but that as soon as the client sends a message, the callback is processed.

My guess is that there is something simple that I missed (and perhaps isn't documented?) Perhaps related to link credit?

xinchen10 commented 2 years ago

I don't think you can use a queue in your scenario. With a queue, each message goes FIFO and goes to ONE receiver only. In this case you have both clients reading from the same queue. They are effectively competing for every message. There is no guarantee that a message sent by client1 goes to client2. In many times, the message is read back by the client itself.

For your scenario you would want to use a topic with n subscribers. Each subscriber now gets a copy of the message and it can filter out its own messages so overall it looks like it is broadcasting to other clients.

petertiedemann commented 2 years ago

Thank you for responding. I am using a topic however (specifying /topic/foo for source/target) and all messages are delivered to both clients, just only as part of sending a message. In the readme in the linked repro, you can see examples of how messages are received. Using rabbitmq i am also seeing a queue created per connected client.

Is there something more i should do when using a topic?

EDIT: Just in case I didn't communicate it clearly: The repro I linked to was created solely for this issue and contains only the minimal code needed (e.g. I am not pointing you to some huge project full of other code :)

xinchen10 commented 2 years ago

I believe the issue you observed is described here

SendAsync continuation is run on the connection's I/O thread, and Console.ReadLine() is blocking it, so the connection is not reading the transport until ReadLine returns.

This is a danger of having blocking calls in an async ode path. To confirm if this is true, you can try adding the following line before Console.ReadLine().

await Task.Yield();
petertiedemann commented 2 years ago

SendAsync continuation is run on the connection's I/O thread, and Console.ReadLine() is blocking it, so the connection is not reading the transport until ReadLine returns.

Damn, you are right. I can also see that OnMessage gets called by the same thread that the main loop runs on. I thought the IO thread was a dedicated thread created by amqpnetlite (meaning I should just avoid blocking it in the callback).

Doing readline in a Task.Run fixes the problem in this test app. What exactly determines what thread is used to receive messages? Is it the thread calling receiver.Start?

I need to go back to my real code base and check if it is indeed the same problem there. I will report back (and probably close the issue) ASAP.

petertiedemann commented 2 years ago

Playing around with it some more, and I can see that callbacks always occurs on the main thread of the running program (and doing receiverLink.Start in another thread does not change that).

I can solve the problem in my sample app by running the "application logic"( the little loop) in a separate thread.

In my real codebase (which is a simple notification API supporting different different backends), I have a sender thread that reads outgoing messages from an in-memory queue. It sounds like I should do something similar for receiving messages. E.g. run a receiver thread calling the blocking Recieve method, or will I still have a connection pump on my main thread?

Btw, you referenced https://github.com/Azure/amqpnetlite/blob/master/docs/articles/building_application.md#threading but reading that it specifically talks about not blocking the callbacks (which I am not). Rather I am blocking the thread that happens to be where the "connection pump" is running it seems. It kind of feels like its "hijacking" my main thread :)

If you can help me understand the "threading model" behind the scenes I would be happy to do a little PR to extend the documentation and prevent others from hitting the same behavior.

petertiedemann commented 2 years ago

@xinchen10 Is it correctly understood that this is (essentially) the same problem as https://github.com/Azure/amqpnetlite/issues/237 ?

I decide to resolve the problem in my real code base by simply stop using the Async APIs. There is no way that I can ensure that I (or users of my lib) won't block a thread pool thread for extended periods (and that thread might be the one running the pump), and that seems to be a requirement for using this async API. Instead I am creating a separate thread and using the Sync API.

PS: Interestingly, I have backends implemented in the library using MQTTNet, Redis and a few others with async APIs, and I have not encountered anything similar to this problem.

Havret commented 2 years ago

Of course, you can. You just need to handle the continuation properly and do not perform any client-specific work on the pump thread.

Have a look at how I'm doing this in ArtemisNetClient which uses this lib under the hood and is fully async.

Consumer: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/Consumer.cs#L33 Producer: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/ProducerBase.cs

petertiedemann commented 2 years ago

Thanks for replying, it was this kind of response I was trying to "provoke" :)

You just need to handle the continuation properly and do not perform any client-specific work on the pump thread.

See, this is what I don't get (probably me being dense :). We keep talking about the pump thread, but as far as I can tell from the code its an async pump, so it is not associated any thread specifically, right? That is why I am so surprised that it cares about what happens in its continuation. Looking at https://github.com/Azure/amqpnetlite/issues/237 , I get the impression that it might be related to synchronous continuation inside a lock? Normally when using an async API, I would not expect there to be any thread "affinity".

Have a look at how I'm doing this in ArtemisNetClient which uses this lib under the hood and is fully async.

Consumer: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/Consumer.cs#L33 Producer: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/ProducerBase.cs

Looking at your producer code, it seems that you are using the sync API for sending: https://github.com/Havret/dotnet-activemq-artemis-client/blob/master/src/ArtemisNetClient/ProducerBase.cs#L110 ?


Just to be clear, for my use case it is perfectly fine to use the sync API, I am looking at notifications with a very low rate, and very few channels, so a bit of extra overhead is not a problem. However, I really would like to understand it :) Maybe if i did I could help to explain it to anyone else that is confused as well (and looking at https://github.com/Azure/amqpnetlite/issues/237 I do not seem to be alone ).

Havret commented 2 years ago

The pump is just while(true) loop statement that's reading bytes from the socket. If it receives enough data to parse a complete frame it deserializes it and invokes the client-specific code like your OnMessage callback. If you block inside this callback the pump cannot read any new bytes. So your app gets stuck.

The same applies when you are sending a message. When the other side receives it, it sends back ack frame that completes this operation from your POV (again there's callback that completes TaskCompletionSource). If you run some blocking code as a continuation, you end up with the same problem as above because you're again blocking the pump.

In my producer example I manually control completion via my own TaskCompletionSource. So I can instruct it to run the continuation on the thread pool and make sure that I don't block the pump.

I hope that answers your question.

petertiedemann commented 2 years ago

The pump is just while(true) loop statement that's reading bytes from the socket. If it receives enough data to parse a complete frame it deserializes it and invokes the client-specific code like your OnMessage callback. If you block inside this callback the pump cannot read any new bytes. So your app gets stuck.

It is somewhat expected that you should not block in a callback ( I mean that would also apply with a dedicated receiver thread), but far less clear for a continuation.

The same applies when you are sending a message. When the other side receives it, it sends back ack frame that completes this operation from your POV (again there's callback that completes TaskCompletionSource). If you run some blocking code as a continuation, you end up with the same problem as above because you're again blocking the pump.

So it is in fact because of TaskCompletionSource.SetResult running synchronously, basically turning my continuation into a callback? That makes sense, but is a serious "footgun" IMO, especially because the continuation is not always run synchronously, so it sometimes works as expected.

I wonder if @xinchen10 would be open to a doc PR to clarify this a bit.

I hope that answers your question.

Thanks, I think I understand it now, and it also clarifies the discussion here: https://github.com/Azure/amqpnetlite/issues/237 .

xinchen10 commented 2 years ago

The library doesn't use dedicated threads for I/O operations, neither does it for internal library operations, so the term "pump thread" is not 100% correct. It is just a thread from thread pool that executes the connection's pump at a given moment. One advantage of this design choice is less context switch and likely better performance but at the same time application developers can easily run into issues like this one. It is similar to the task sync and async deadlock issues discussed in many online posts, though it is a little more subtle here because the connection pump execution is always single threaded.

The framework later supported task creation and continuation options to run continuation asynchronously, but the problem is knowing the potential issue and mitigating it with that option. Alternatively, you could just make it the default and do not care about the performance penalty. We could add an option to connection factory that controls how user's callback and task continuation should be executed, but it goes back to the original question on what should be the default value and when it should be changed.

PRs are always welcome. If we explain this better in the doc, it will help others.

petertiedemann commented 2 years ago

@xinchen10 I will close this issue and am just writing up a small PR for the docs.