Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
401 stars 143 forks source link

Feature request: connection automatic reconnect #33

Open ChugR opened 9 years ago

ChugR commented 9 years ago

Issue #32 discussed this. It is a common pattern that many amqpnetlite users will see and it would be a great feature to consider. A model for this feature is Qpid C++ Messaging which describes reconnect behavior and option settings.

ChugR commented 9 years ago

A second feature request built on top of automatic reconnect is failover. A user could send a list of Address objects to a Connection constructor instead of a single Address.

The library would try to connect to each address in turn.

xinchen10 commented 9 years ago

When connection reconnect is enabled, does it just re-establish the transport? How about sessions/links?

I think it makes sense to build these features in a layer on top of the protocol stack. For example, the Service Bus .NET SDK has MessagingFactory/Sender/Receiver objects. They auto-create the underlying connection/session/link when needed. The Proton Messenger might do something similar.

mcmellawatt commented 8 years ago

We would very much like this feature as well. Currently we have built our own reconnection logic which recreates the session/link/connection objects when certain AmqpExceptions are caught in the client. It works, though I am not sure if it would make more sense to handle reconnection at a lower level.

If our approach is reasonable, from my understanding there would be no need to call Close() on the existing link/session/connection objects before creating new ones, because closing the ContainerHost on the server or terminating the server process actually triggers closing/aborting logic in the client. Is this a reasonable assumption?

xinchen10 commented 8 years ago

Thanks for the feedback. We will consider enabling this feature in a future release (cannot promise when it will be ready though). Application would still need to recreate session/link in cases of communication errors.

networkfusion commented 8 years ago

@mcmellawatt would you be prepared to share your reconnection wrapper/logic? I am having trouble with keeping connections open over long periods of time....

caserzer commented 8 years ago

@mcmellawatt I have the same question , would you share the retry logic ? @networkfusion can you share the exception inform?

duizendnegen commented 8 years ago

Similarly so, my listener client just stops working at some point - I seem not even to receive a Close event so I can't decide to reconnect or not.

duizendnegen commented 8 years ago

I created my own reconnection logic, also for the receiver objects (which never catches exceptions, since it merely stops receiving messages) and added this to a demo project. See https://github.com/duizendnegen/net-docker-web-worker-role/blob/master/Jobs/QueueListener.cs and https://github.com/duizendnegen/net-docker-web-worker-role/blob/master/Jobs/QueueConnection.cs. I'll make a write-up of this in a blog post shortly.

duizendnegen commented 7 years ago

Completed the write-up, in case anyone stumbles on the same problem: https://medium.com/@duizendnegen/running-worker-roles-with-docker-in-net-core-543b8d1c4ae7

xinchen10 commented 7 years ago

I added an article that describes common reconnect approaches. There is also a test project that demonstrates the usage. I am able to run the test against a Service Bus queue for weeks and transfer tens of millions messages between the senders and the receivers.

duizendnegen commented 7 years ago

Thanks for the write-up @xinchen10, seems like a holistic approach to solving this problem. Reading through the code and also the documentation I couldn't quite figure out how the Consumer re-connects after the 10 minute auto-disconnect after inactivity from Service Queue kicks in. As far as I know, the Closed event doesn't fire in this case, nor is the IsClosed property checked (because no new Consumer is constructed, the only thing that it notices is that no more messages are arriving.) Could you elaborate how this case is handled?

xinchen10 commented 7 years ago

@duizendnegen Consumer supports three modes for receiving messages: sync, async and callback. The base class Role.SyncTest/AsyncTest has a while loop that drives the test execution by doing EnsureConnection, EnsureLink and Execute repeatedly. The sync and async modes are straightforward. The callback mode is a bit confusing because it does not need a while loop to drive execution, but I used the loop in the base class to run auto-reconnect. The interesting class is Consumer.MyCallbackTest. The CreateLink method subscribes to the Closed event of all Connection, Session and ReceiverLink objects and the Execute method blocks the execution until the manual reset event is signaled, which is done when any of the AMQP objects is closed. In real code this logic may be just inside OnChannelClosed event handler. The while loop is single threaded so I do not need to worry about concurrency issue. If the reconnect code could be invoked concurrently, the application must have some synchronization to ensure an AMQP object is recreated only once.

You are right that the Service Bus queue closes idle receivers after 10 minutes but if a receiver has outstanding link credits it is considered active and will not be closed by the idle timer (but it could be closed for other reasons).

To test the Closed event of the receiver, I tried the following code:

static void Main(string[] args)
{
    var a = new Address("amqps://<key>:<value>@<myns>.servicebus.windows.net");
    var c = Connection.Factory.CreateAsync(a).Result;
    var s = new Session(c);
    var r = new ReceiverLink(s, "receiver1", "q1");
    r.Closed += R_Closed;

    Thread.Sleep(-1);
    c.Close();
}

private static void R_Closed(AmqpObject sender, Error error)
{
    Console.WriteLine(sender + " closed");
}

Note that I did not call Start or Receive so the receiver link does not send any credits to the service. After 10 minutes it is closed and the event handler is fired. Did you subscribe to the Closed event of Connection and Session in your code?

duizendnegen commented 7 years ago

@xinchen10 you are right, I adapted the example to also test still being able to receive new messages (i.e. with outstanding credits) and that works perfectly - I'm not sure why originally I hadn't been receiving the Closed event, but that seems to be entirely on my side.

Just for completeness, the Program.cs I ended up using was

private static Address a;

static void Main(string[] args)
{
    a = new Address("queue-experiment.servicebus.windows.net",
        5671,
        "SendListen",
        "W/w7Kf2g33nf0nDbe9gvLN68lCkGb8ZhQPk2WPB4fzI=");

    Thread publisher = new Thread(PublisherThread);
    Thread consumer = new Thread(ConsumerThread);

    publisher.Start();
    consumer.Start();

    publisher.Join();
    consumer.Join();
}

private static void PublisherThread()
{
    {
        var c = Connection.Factory.CreateAsync(a).Result;
        var s = new Session(c);
        var p = new SenderLink(s, "sender1", "experiment");
        p.Send(new Message("Send 1"));
        Console.WriteLine("Send 1 sent");
    }

    // sleep 60 minutes
    Thread.Sleep(1000 * 60 * 60);

    {
        var c = Connection.Factory.CreateAsync(a).Result;
        var s = new Session(c);
        var p = new SenderLink(s, "sender2", "experiment");
        p.Send(new Message("Send 2"));
        Console.WriteLine("Send 2 sent");
    }
}

private static void ConsumerThread()
{
    var c = Connection.Factory.CreateAsync(a).Result;
    var s = new Session(c);
    var r = new ReceiverLink(s, "receiver1", "experiment");
    r.Closed += R_Closed;

    r.Start(10, R_OnMessage);

    Thread.Sleep(-1);
    c.Close();
}

private static void R_OnMessage(ReceiverLink receiver, Message message)
{
    Console.WriteLine(message.GetBody<string>() + " received");
    receiver.Accept(message);
}

private static void R_Closed(AmqpObject sender, Error error)
{
    Console.WriteLine(sender + " closed");
}

where I do not receive the closed event and still receive a new message after 1 hour, as desired.

brvaland commented 7 years ago

@xinchen10 @duizendnegen - I am a bit confused reference to outstanding credit specified on receiver.start as how it works. Does service bus closes the connection once the outstanding credit has reached?

So lets say you have specified a credit of 10, so you will be receiving 10 messages without any time restriction but what happens after you have received 10 messages will it reconnect automatically with new credit of 10.

Please clarify?

xinchen10 commented 7 years ago

@brvaland if the client does not issue more credits after the existing credits are used up, the Service Bus service closes the receiver link after 10 minutes (the client is considered idle in this case).

Regarding link credit you can find more details here: https://github.com/Azure/amqpnetlite/blob/master/docs/articles/building_application.md#receiving-messages

As long as there are outstanding credits on the service side, the link will be kept open, unless some networking error or system transient error happens (then the client needs to reconnect).

4integration commented 7 years ago

It would be great if this could be implemented in Amqp.Net Lite as we are aiming at clustered setup of RabbitMQ. @mcmellawatt can you share your code, please?

koosala commented 6 years ago

I understand the approach for auto-reconnect has been discussed here. Is there a recommendation for failover? We plan to use Red Hat AMQ (based on Apache Artemis) as the message broker. AMQ will have a failover cluster - A master is active backed up by a slave. The master can go down for some reason and the slave now becomes active. The client can do retries to the master, but after a while it should understand that the master isn't available and try the failover slave.

My question is is there any event that AMQP client will receive, which will inform the consumer a failover has occurred, and maybe the address of the failover node? Or will this be more of trial and error - tried master a few times, did not succeed, "suspect" that a failover has occurred and start trying connecting to the slave now?

I understand Artemis supplied Java client does this out-of-the box, so any approach that would mimic such a behavior, or any best practice will help

ricardozanini commented 6 years ago

+1 about @koosala comment. Would be nice to mimic some requirements from the Artemis client in the .NET AMQP. Is there any documentation about this implementation so we can help implement this feature somehow?

jirkadanek commented 6 years ago

My question is is there any event that AMQP client will receive, which will inform the consumer a failover has occurred, and maybe the address of the failover node?

[...]

I understand Artemis supplied Java client does this out-of-the box, so any approach that would mimic such a behavior, or any best practice will help

In general, the mechanism to detect from the client side that failover has happened is to

  1. have existing connection to master fail on TCP error
  2. (more likely, because faster) have the AMQP connection expire on missed heartbeat frames
  3. get a TCP FIN packet from the OS on the machine running the master broker, assuming only broker process died, and the machine is still up
  4. get a clean AMQP connection close frame, with connection:forced reason, as mentioned on http://qpid.2158936.n2.nabble.com/Reconnect-and-amqp-connection-forced-td7659043.html

The 2. (and 1.) are universal and dependable (although they depend on timeouts). Anything else requires additional communication, which may not happen, if the reason of master going down is say a cut ethernet cable of the machine running it.

For the same reason, it is too late to communicate the address of the failover node when failover happened. The mechanism to communicate this is the failover-server-list property in the AMQP open frame, when the connection is being created. See https://www.oasis-open.org/committees/download.php/60379/amqp-failover-v1.0-wd01.pdf.

I am not aware of any mechanism where the slave broker would inform the clients previously connected to the master broker that they should now reconnect. It would be clumsy to implement, anyways, if a connection from client to to slave did not exist from the start.

Also, not that it would speed things up much. The slave takes time to come up, which is comparable to usual heartbeat periods. So, no great time gains seem to be in this approach.

There is a reconnect example in amqpnetlite sources at https://github.com/Azure/amqpnetlite/tree/6c195bed83380eff7d730f8108f39f00b35f3356/Examples/Reconnect/ReconnectSender

HavretGC commented 5 years ago

Failover is currently supported by Apache ActiveMQ NMS.AMQP Client which is build on top of amqpnetlite. https://github.com/apache/activemq-nms-amqp/pull/4

iron9light commented 5 years ago

@xinchen10 Any update about this feature request? Do we still need to handle reconnection by ourselves? And for the receiving case, if we keep receiving with 1min timeout, will it keep the connection alive?

michaelandrepearce commented 4 years ago

Hi

The Apache ActiveMQ team have now relesed a official NMS higher level client (underlying lib is aqmqpnetlite), it includes failover support amongst a number of other higher level abstractions.

Its been tested with ActiveMQ 5.x and Artemis brokers as well as Solace. Being AMQP should work also with any other AMQP 1.0 broker.

You can find more details https://activemq.apache.org/components/nms/providers/amqp/

Or search Apache.NMS.AMQP on nuget