Azure / azure-event-hubs-node

Node client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
50 stars 46 forks source link

Reconnect logic does not work for socket timeout after connection is opened #70

Closed dank-arundo closed 6 years ago

dank-arundo commented 6 years ago

Describe the bug In a long-lived event hub sender, a transport error, such as a socket timeout, that occurs after the connection is established does not trigger the reconnect logic. This is because the rhea-promise library removes the "disconnected" event listener from Rhea when the connection has been opened successfully. However, a socket timeout can occur long after the connection is opened.

To Reproduce

  1. (optional) set the environment variable "DEBUG" to "rhea:event" to print events emitted from Rhea
  2. Open an EventHubSender.
  3. Send messages for several days until a socket timeout occurs.
  4. Note that all subsequent attempts to send a message result in an error message like "cannot send the message right now. Please try later."

Expected behavior Either the sender should reopen and reconnect, or an event should be emitted to clients so that the client can handle the disconnect as needed. Because the sender is opened during the send call and is not returned to the client, I don't see a way to attach a "disconnected" listener to the Rhea connection. If no listeners are attached to the Rhea connection, Rhea simply writes an error message to stdout and continues, providing no mechanism for a client app to detect such a socket failure.

Package-name: azure-event-hubs | azure-event-processor-host Package-version: 0.1.2 node.js version: 9.11.1 OS name and version: MacOSX 10.13

Additional context Add any other context about the problem here.

dank-arundo commented 6 years ago

Some more details...

Rhea has this logic (in rhea.js lines 467-482):

Connection.prototype._disconnected = function () {
    if (this.heartbeat_out) clearTimeout(this.heartbeat_out);
    if (this.heartbeat_in) clearTimeout(this.heartbeat_in);
    if (!this.is_closed() && this.scheduled_reconnect === undefined) {
        if (!this.dispatch('disconnected', this._context())) {
            console.warn('[' + this.options.id + '] disconnected ');
        }
        if (!this.is_server && !this.transport_error && this.options.reconnect) {
            var delay = this.options.reconnect(this.conn_established_counter);
            if (delay >= 0) {
                log.reconnect('Scheduled reconnect in ' + delay + 'ms');
                this.scheduled_reconnect = setTimeout(this.reconnect.bind(this), delay);
            }
        }
    }
};

A socket timeout results in the console.warn() getting called, because there are no listeners for the 'disconnected' event on the connection after it has been opened successfully.

I propose this as a fix. In rhea-promise/index.ts (starting at line 18), add a new event listener that is registered after the connection is opened successfully:

export function connect(options?: ConnectionOptions): Promise<any> {
  return new Promise((resolve, reject) => {
    const connection = rhea.connect(options);

    function removeListeners(connection: any): void {
      connection.removeListener("connection_open", onOpen);
      connection.removeListener("connection_close", onClose);
      connection.removeListener("disconnected", onClose);
      connection.removeListener("disconnected", onTransportError);
    }

    function onOpen(context: any): void {
      removeListeners(connection);
      connection.once("disconnected", onTransportError);
      process.nextTick(() => {
        debug("Resolving the promise with amqp connection.");
        resolve(connection);
      });
    }

    function onTransportError(context: Context): void {
      debug(`Error occurred after establishing amqp connection.`, context.connection.error);
      closeConnection(context.connection).then(() => {
        context.connection = undefined;
      });
    }

    function onClose(context: Context): void {
      removeListeners(connection);
      debug(`Error occurred while establishing amqp connection.`, context.connection.error);
      reject(context.connection.error);
    }

    connection.once("connection_open", onOpen);
    connection.once("connection_close", onClose);
    connection.once("disconnected", onClose);
  });
}

I believe that connection is initialized if not set when send/receive are called, so I believe that simply closing the connection should be sufficeint.

amarzavery commented 6 years ago

Hi @dank-arundo, thanks for filing the issue and following it up with an investigation. Your approach seems reasonable to me.

However, I thought I had taken care of this scenario. rhea has a method called is_Open() on the sender and receiver links. Before sending the message, I check whether the underlying sender link is open over here. If not then I try to re-establish the link. I think, I messed up in the _init() method where I check whether the underlying session and sender objects are defined before creating them [over here]. I should have rather checked whether they are open. I made that change in the 0.2.1 version that I published last night.

Can you install that version and let me know if that works?

anthonyvercolano commented 6 years ago

I'm currently working on this section of code in my own port.

When one has a link that is successfully attached I thought one would need, for senders, the following listeners: error, accepted, rejected, released, sendable, sender_close. I'm also thinking that one would need a listener for 'disconnected'

error, sender_close, and disconnected would all seem to be reasons to shut down the link and initiate reconnection.

It would seem that testing a link for is_open() shouldn't be necessary, if the 'attached' state has been successfully entered, and the listeners properly set up.

Thoughts?

dank-arundo commented 6 years ago

I've just upgraded to v0.2.1. It looks like it should handle socket timeouts. We've got it running now in our test environment. I'll follow up when we see a socket timeout--usually within a few days.

amarzavery commented 6 years ago

I see. However you and @anthonyvercolano make a good point about handling the disconnected event for the connection. I think the is_open() check will help you when the sender link was closed but the connection is still up. However, if the connection is down then i am not completely sure if that will work. Hence having a disconnected event listener that handles disconnects and recreates all the senders and receivers on the connection would be an ideal approach.

Btw, do you guys know of a mechanism to induce the socket to timeout for testing purposes, so disconnects and reconnects can be tested in a sane way. If there is any other approach to test this then please let me know about it as well.

dank-arundo commented 6 years ago

You can give it to me--we've got a particularly flaky network :-)

I haven't used this, but it looks like it might work: https://github.com/Shopify/toxiproxy

anthonyvercolano commented 6 years ago

(Not about testing socket timeout) Sending and using the sendable event and the is_sendable:

My sending puts all sends into a queue. When we get into an attached state, we know (because reasons) that we have something in the queue.

We have a send action in a finite state machines attached state. It just has a while loop that is controlled by the is_sendable() function. If that returns false we just stop sending. We make the assumption that if is_sendable returns false, then at some future point, in the absence of errors, a sendable event will be emitted by rhea. The event listener simply calls into the send action of the attached state that has the while loop just mentioned.

I never check for open. My assumption would be that if is_open is false, that could have only happened with an error, which I should have already handled with one of my listeners (sender_close, error, disconnected).

amarzavery commented 6 years ago

@dank-arundo - We have published a new version azure-event-hubs@0.2.4 that handles disconnects for the connection and detach for the sender and receiver links. Please try it out and let us know if you see any issues.

amarzavery commented 6 years ago

Reconnect for handling disconnected event and for sender/receiver has been implemented. Please feel free to reopen this issue or open a new one, if the problem persists.