Foo-Foo-MQ / foo-foo-mq

Abstractions around RabbitMQ
MIT License
48 stars 24 forks source link

Rabbit MQ is not able to subscribe to queue once RabbitMQ service is restarted #23

Open HarryDev3333 opened 3 years ago

HarryDev3333 commented 3 years ago

Please find my code below. My RabbitMQ config file is as follows:

 export const Exchanges = [
  {
    name: 'myExchange',
    passive: true
  }
];

 export const Queues = [
  {
    name: 'myQueue',
    passive: true
  }
];

 export const Bindings = [
  {
    exchange: 'myExchange',
    target: 'myQueue',
  }
];

 export const connectionSettings = {
  connection: {
    name: 'myConnection',
    user: 'myUser',
    pass: 'mypassword',
    server: 'localhost',
    vhost: '/',
    port: 5672,
    heartbeat: 20,
    replyQueue: false,
    clientProperties: {
      connection_name: 'myConnection'
    }
  },
  exchanges: Exchanges,
  queues: Queues,
  bindings: Bindings
}; 

My RabbitMQ code is as follows.

`const rabbit = require('foo-foo-mq');
let retryCounter = 0;

class RabbitMQ {
    init() {

        // Configure the rabbit mq connection, queues and configBindings
        rabbit.handle({}, handleMessage);
        rabbit.configure(connectionSettings);

        /**
         * Perform retry in case Rabbit MQ is not reachable.
         */
        rabbit.on('unreachable', () => {
            log.error(`RabbitMq: Host unreachable. Trying again -- ${++retryCounter}`);
            rabbit.retry();
        });

        /**
         * Perform retry in case Rabbit MQ connection is failed.
         */
        rabbit.on('failed', () => {
            log.error(`RabbitMq: Connection failed. Trying again -- ${++retryCounter}`);
            rabbit.retry();
        });

        /**
         * Start processing of messages when Rabbit MQ connection is successfull.
         */
        rabbit.on('connected', () => {
            log.info('RabbitMq: connected');
            retryCounter = 0;
        });

        // If queue does not exist then add it, bind with excahnge and suscribe
        rabbit.on(connectionSettings.connection.name + '.connection.opened', (c) => {
            log.info('RabbitMq: Connection ' + connectionSettings.connection.name + ' opened');
            Queues.forEach((q, index) => {
                if (!rabbit.getQueue(q.name, connectionSettings.connection.name)) {
                    rabbit.addQueue(q.name, q, connectionSettings.connection.name).then((s) => {
                        rabbit.bindQueue(configBindings[index].exchange, configBindings[index].target, '',
                            connectionSettings.connection.name);
                        rabbit.startSubscription(q.name, false, connectionSettings.connection.name);
                    });
                } else {
                    log.info('RabbitMq: Connection queue already exists');
                }
            });
        });

        // Close connection
        rabbit.on(connectionSettings.connection.name + '.connection.closed', () => {
            log.info('RabbitMq: Connection ' + connectionSettings.connection.name + ' closed');
        });

        // retry on connection failure.
        rabbit.on(connectionSettings.connection.name + '.connection.failed', (c) => {
            log.error('RabbitMq: Connection ' + connectionSettings.connection.name + ' failed');
            Queues.forEach((q) => {
                if (rabbit.getQueue(q.name, connectionSettings.connection.name)) {
                  rabbit.retry();
                }
            });
        });

        rabbit.on(connectionSettings.connection.name + '.connection.configured', (connection) => {
            Object.entries(connection.definitions.bindings).forEach(([key, value]) => {
                const connectionObj = JSON.parse(JSON.stringify(value));
                log.info(`RabbitMq: Queue ${connectionObj.target} bound to exchange ${connectionObj.source}`);
            });
        });
    }`

After restarting RabbitMQ service, I get following logs and nothing happens after it.

RabbitMq:  connected
 RabbitMq: Connection opened
HarryDev3333 commented 3 years ago

It seems publish method is not working on reconnection. It is giving error failed to create myExchange on connection 'myConnection'. No end points could be reached. But I can see Rabbit MQ has already come in service. It seems to be a issue in library

chris-cynation commented 3 years ago

I'm seeing the same issue.

The logs indict that it has reconnected but the publishing doesn't work.

I can make it reproducible 100% of the time if I start up the client without RabbitMQ and then let it establish a connection.

The logs indicate it is ok but gives the error message No endpoints could be reached.

zlintz commented 3 years ago

I think I need more information as I can't reproduce with what you have provided.

I took your code @HarryDev3333 and modified it slightly to something I could actually run below

const Exchanges = [
  {
    name: 'myExchange',
    type: 'fanout'
  }
];

const Queues = [
  {
    name: 'myQueue'
  }
];

const Bindings = [
  {
    exchange: 'myExchange',
    target: 'myQueue'
  }
];

const connectionSettings = {
  connection: {
    replyQueue: false,
    clientProperties: {
      connection_name: 'myConnection'
    }
  },
  exchanges: Exchanges,
  queues: Queues,
  bindings: Bindings
};

// const rabbit = require('./src/index');
const rabbit = require('foo-foo-mq');
let retryCounter = 0;
let msgCount = 0;

class RabbitMQ {
  static async init () {
    rabbit.handle('#', (msg) => {
      console.log('Received: ', msg.body);
      msg.ack()
    }, Queues[0].name);
    await rabbit.configure(connectionSettings)
      .then(() => {
        console.log('connected');
      });

    rabbit.on('unreachable', () => {
      console.log(`RabbitMq: Host unreachable. Trying again -- ${++retryCounter}`);
      setTimeout(() => {
        retryCounter++;
        console.log('Calling Retry');
        rabbit.retry();
      }, 5000);
    });

    rabbit.on('failed', () => {
      console.log(`RabbitMq: Connection failed. Trying again -- ${++retryCounter}`);
    });

    rabbit.on('connected', () => {
      console.log('RabbitMq: connected');
      retryCounter = 0;
    });

    rabbit.on(connectionSettings.connection.name + '.connection.opened', (c) => {
      console.log('RabbitMq: Connection ' + connectionSettings.connection.name + ' opened');
    });

    rabbit.on(connectionSettings.connection.name + '.connection.closed', () => {
      console.log('RabbitMq: Connection ' + connectionSettings.connection.name + ' closed');
    });

    rabbit.on(connectionSettings.connection.name + '.connection.failed', (c) => {
      console.log('RabbitMq: Connection ' + connectionSettings.connection.name + ' failed');
    });

    rabbit.on(connectionSettings.connection.name + '.connection.configured', (connection) => {
      console.log('RabbitMq: Connection ' + connectionSettings.connection.name + ' configured');
    });

    rabbit.startSubscription( Queues[0].name)
  }
}

RabbitMQ.init()
  .then(() => {
    console.log('Initialized');
    setInterval(() => {
      console.log('publishing a message')
      rabbit.publish(Exchanges[0].name, "testing", {a : msgCount++});
    }, 2000)
  })
  .catch((err) => {
    console.log('fatal....................');
    console.log(err);
    process.exit(1);
  });

And then did a docker restart on a local rabbit instance and it reconnected and continued to process messages.

Let me know what I am missing?

zlintz commented 3 years ago

Maybe even more so simplified...

const Exchanges = [{ name: 'myExchange', type: 'fanout' }];
const Queues = [{ name: 'myQueue' }];
const Bindings = [{ exchange: 'myExchange', target: 'myQueue' }];

const connectionSettings = {
  connection: {
    replyQueue: false
  },
  exchanges: Exchanges,
  queues: Queues,
  bindings: Bindings
};

// const rabbit = require('./src/index');
const rabbit = require('foo-foo-mq');
let retryCounter = 0;
let msgCount = 0;

class RabbitMQ {
  static async init () {
    rabbit.handle('#', (msg) => {
      console.log('Received: ', msg.body);
      msg.ack();
    }, Queues[0].name);
    await rabbit.configure(connectionSettings)
      .then(() => {
        console.log('connected');
      });

    rabbit.on('unreachable', () => {
      console.log(`RabbitMq: Host unreachable. Trying again -- ${++retryCounter}`);
      let intervalId = null;
      intervalId = setInterval(() => {
        console.log(`RabbitMq: Host still unreachable. Trying again -- ${++retryCounter}`);
        retryCounter++;
        console.log('Calling Retry');
        rabbit.retry().then(() => {
          clearInterval(intervalId);
        });
      }, 5000);
    });

    rabbit.on('failed', () => {
      console.log(`RabbitMq: Connection failed. Trying again -- ${retryCounter}`);
    });

    rabbit.on('connected', () => {
      console.log('RabbitMq: connected');
      retryCounter = 0;
    });

    rabbit.startSubscription(Queues[0].name);
  }
}

process.on('unhandledRejection', (err) => {
  console.log(err.message);
})

RabbitMQ.init()
  .then(() => {
    console.log('Initialized');
    setInterval(() => {
      console.log('publishing a message');
      rabbit.publish(Exchanges[0].name, 'testing', { a: msgCount++ });
    }, 3000);
  })
  .catch((err) => {
    console.log('fatal....................');
    console.log(err);
    process.exit(1);
  });

Produced the following console output which reconnected on a docker restart of a locally running single rabbit instance

connected
Initialized
publishing a message
Received:  { a: 0 }
RabbitMq: Connection failed. Trying again -- 0
RabbitMq: Connection failed. Trying again -- 0
RabbitMq: Host unreachable. Trying again -- 1
publishing a message
Publish failed - no exchange myExchange on connection default is defined
publishing a message
Publish failed - no exchange myExchange on connection default is defined
RabbitMq: Host still unreachable. Trying again -- 2
Calling Retry
RabbitMq: Connection failed. Trying again -- 3
RabbitMq: Connection failed. Trying again -- 3
Failed to create exchange 'myExchange' on connection 'default' with 'No endpoints could be reached'
RabbitMq: Connection failed. Trying again -- 3
RabbitMq: Host unreachable. Trying again -- 4
publishing a message
Failed to create exchange 'myExchange' on connection 'default' with 'No endpoints could be reached'
publishing a message
Failed to create exchange 'myExchange' on connection 'default' with 'No endpoints could be reached'
RabbitMq: Host still unreachable. Trying again -- 5
Calling Retry
RabbitMq: connected
RabbitMq: Host still unreachable. Trying again -- 1
Calling Retry
publishing a message
Received:  { a: 5 }
publishing a message
Received:  { a: 6 }
publishing a message
Received:  { a: 7 }
publishing a message
Received:  { a: 8 }
HarryDev3333 commented 3 years ago

I will try your code and will get back to you if I face any issue. One more doubt, this library automatically creates queues and exchange if they don't exist. Do we need to use flag passive: true in both exchange and queues so that they are not automatically created?

zlintz commented 3 years ago

That is correct to use the passive flag if you don't want anything to be automatically created, cursory glance looks like I need to add the passive flag as an option to the docs for exchanges as it isn't explicitly called out.

xiic commented 2 years ago

I ran into the same issue. It's easy reproducible, happens always if configure is called before RabbitMQ is running.

If RabbitMQ is running before the foo-foo-mq startup, everything (including reconnecting) works fine.

As a workaround, I added shutdownresetconfigure in the catch block:

const rabbit = require('foo-foo-mq');
const config = { ... };

function configureAmqp() {
  console.log("AMQP: Configuring...");
  rabbit.configure(config)
  .catch((err) => {
    console.log("AMQP:", err);
    rabbit.shutdown()
    .then(() => {
      rabbit.reset();
      configureAmqp();
    })
  });
}

configureAmqp();
MartianH commented 3 months ago

@zlintz I was wondering if this ticket could be closed. @xiic solution seems sensible and is one I have come to use as well. Here is my implementation (hoping to add this to the Topology documentation in a PR):

// ES6
// import * as rabbit from "foo-foo-mq";
import rabbit from "foo-foo-mq";
import { setTimeout } from "timers/promises";

//CommonJS
const rabbit = require( "foo-foo-mq" );
const { setTimeout } = require( "timers/promises" );

async function tryConfigure(
  settings,
  opts
) {
  const retries = opts.retries || 10;
  try {
    await rabbit.configure(settings);
  } catch (error) {
    if (error === 'No endpoints could be reached' && retries > 0) {
      if (opts.defer) await setTimeout(opts.defer);
      await rabbit.shutdown();
      await rabbit.reset();
      await this.tryConfigure(settings, { ...opts, retries: retries - 1 });
    } else {
      throw error;
    }
  }
  return rabbit;
}

Edit: updated to be simplified