jwalton / node-amqp-connection-manager

Auto-reconnect and round robin support for amqplib.
528 stars 105 forks source link

Unhandled rejection error: Channel ended, no reply will be forthcoming #97

Open vunguyen11295 opened 5 years ago

vunguyen11295 commented 5 years ago

"use strict"; const amqp = require('amqp-connection-manager'); const mqcon = require('./mqConnection'); const AMQP_URL = mqcon.getAMQPURL();

module.exports.success = function success(res, next, data) { _respond(res, next, 'success', data, 200); }

module.exports.failure = function failure(res, next, data, http_code) { _respond(res, next, 'failure', data, http_code); }

module.exports.publish = async function (msgKey, msgPayload) { const connection = await amqp.connect(AMQP_URL); const exch = 'taskch';

// Create a channel wrapper const channelWrapper = await connection.createChannel({ setup: channel => channel.assertExchange(exch, 'direct', { durable: true }) });

await channelWrapper.publish(exch, msgKey, Buffer.from(msgPayload)) .then(function () { // console.log("Message sent", msgPayload); }) .catch(err => { console.log("[ERROR] Message was rejected:", err.stack); channelWrapper.close(); connection.close(); });

};

module.exports.consume = async function (ex, qname, msgKey, invkFn) { const connection = await amqp.connect(AMQP_URL); const onMessage = data => { channelWrapper.ack(data); //call the function to be invoked on receipt of a message invkFn(data); }

// Set up a channel listening for messages in the queue. const channelWrapper = connection.createChannel({ setup: channel => Promise.all([ channel.assertExchange(ex, 'direct', { durable: true }), channel.assertQueue(qname, { exclusive: false }), // , autoDelete: true channel.bindQueue(qname, ex, msgKey), channel.consume(qname, onMessage) ]) });

channelWrapper.waitForConnect() .then(function () { console.log("[INFO] Waiting for messages in %s. To exit press CTRL+C", qname); }); }

This is my code, please help!

nieverbe commented 4 years ago

@vunguyen11295, Do you already have a solution for this? Seeing the same on my implementation..

morigs commented 4 years ago

Try to wrap await channelWrapper.waitForConnect() with try-catch (now you have only then without catch)

Also do not use await in await connection.createChannel. createChannel doesn't return a Promise

dorshay6 commented 1 year ago

I am having a similar problem. It looks like it happens after logs of mirroring of messages inside the cluster, I think the channel state was not updated for some reason