skif48 / nabbitmq

Node.js library for interacting with RabbitMQ based on RxJS streams
MIT License
19 stars 2 forks source link

Should a queue will be created automatically after first message is delivered to an exchange #3

Closed marcio199226 closed 5 years ago

marcio199226 commented 5 years ago

Hi thanks for this awesome.

I have a question because i noticed that when I publish a new message to an queue that does not exists that queue is not created itself at the first message delivered so i have to create them manually from rabbitmq dashobard.

Is this the desired behaviour, isn't it??

P.S might you explain me what is for the publisherConfirms option?

skif48 commented 5 years ago

Hello @marcio199226, thanks for your attention to the project!

Publisher class is abstracted away from queue creation, this class only works with exchanges, because that's the only thing RabbitMQ publishers should really work with - it's rarely a good practice to work directly with queues themselves.

What is your use case? Optional queue assertion and binding during publisher creation might be a nice feature request.

skif48 commented 5 years ago

About publisher confirms you can read here, but to keep it short, it's a way for publisher to identify the fact that RabbitMQ did receive a message we just published.

If publisher confirmation fails for whatever reason, an instance of PublisherConfirmationError is thrown to the publisher actions stream. You can use instanceof to catch it.

marcio199226 commented 5 years ago

Ok thanks for explanation

i have no special use case for not using an exchange.

I have another question when i start to consume messages from a queue new exchange was created an example:

async function main() {
  const connectionFactory = new RabbitMqConnectionFactory();
  connectionFactory.setUri('amqp://localhost:5672');
  const connection = await connectionFactory.newConnection();
  const consumerFactory = new ConsumerFactory(connection);
  consumerFactory.setConfigs({
    noDeadLetterQueue: true,
    autoAck: false,
    prefetch: 1,
    queue: {
      name: 'accounts_input_google',
      durable: true,
    }
  });
  const consumer = await consumerFactory.newConsumer();

  consumer.startConsuming().subscribe({
    next: async (message) => {
      const payload = JSON.parse(message.content);
     // other stuff
    },
    complete: () => console.log('completed'),
    error: (error) => {
      console.error(error);
    }
  });
}

After a message was consumed i see exchange_accounts_input_google.

How can i prevent from creating new exchange?

skif48 commented 5 years ago

Exchange is created by default and bound to your queue. If for some reason, you don't need an exchange, you can leverage a setup function. You can find an example here under With custom setup function section.

Not using an exchange is not a usual use case for RabbitMQ setup, therefore an exchange is created by default when you instantiate a Consumer instance. This exchange is later needed when you're going to publish messages.

marcio199226 commented 5 years ago

I will explain better you my rabbitmq setup.

I would have 1 exchange of direct type which has bindings based on routing keys which will routing those messages to queue with corresponding routing key.

The "issue" is when i have a consumer for queue "bar" it automatically created an exchange bar_exchange but in my case already exists an exchange so i wouldn't to create a new one for each queue the consumer is subscribed to.

Thanks for all replies i will look for custom setup function anyway or rewrite my consumers using raw ampqlib

skif48 commented 5 years ago

@marcio199226 I see. Well, you can provide exchange info within consumer configs. Consumer class uses assertions, these operations are idempotent, meaning, they will either create a required resource or ensure its existence. Therefore if you provide necessary info about your exchanges and bindings to your consumer configs, not only it will work for you, but also you'll be sure that your setup is alright, before you start consuming.

skif48 commented 5 years ago

@marcio199226 If you decide to use consumer configs for your case, please let me know if they don't fit your needs :slightly_smiling_face:

marcio199226 commented 5 years ago

Hi, i have used custom setup function but it will throw an error:

const rabbit                    = require('nabbitmq');
const RabbitMqConnectionFactory = rabbit.RabbitMqConnectionFactory;
const ConsumerFactory           = rabbit.ConsumerFactory;

const connectionFactory = new RabbitMqConnectionFactory();
connectionFactory.setUri('amqp://localhost:5672');
const connection = await connectionFactory.newConnection();
const consumerFactory = new ConsumerFactory(connection);
consumerFactory.setCustomSetupFunction(async (connection) => {
const channel = await connection.createChannel();
await channel.assertExchange(`dynamic_exchange_name_${id}`, 'direct', { durable: true });
const queueMetadata = await channel.assertQueue(`dynamic_queue_name_${id}`, {
durable: true,
});

await channel.bindQueue(queueMetadata.queue, `dynamic_exchange_name_${id}`, `my.routing.key.${id}`);
await channel.prefetch(1);

return {channel, prefetch: 1, autoAck: false};
});
const consumer = await consumerFactory.newConsumer();

consumer.startConsuming().subscribe({
next: async (message) => {
// code
},
complete: () => console.log('completed'),
error: (error) => {
console.error(error);
}
});

After executing i have the following error: node consumer.js (node:24241) UnhandledPromiseRejectionWarning: Error: Cannot read property 'queue' of null at ConsumerFactory.<anonymous> (/home/oskar/projects/instagram/node_modules/nabbitmq/lib/factories/consumer-factory.js:35:23) at Generator.throw (<anonymous>) at rejected (/home/oskar/projects/node_modules/nabbitmq/lib/factories/consumer-factory.js:5:65) at <anonymous> at process._tickCallback (internal/process/next_tick.js:189:7) (node:24241) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1) (node:24241) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

But the exchange is created successfully with right bindings and the queue is created too.

What i'm doing wrong ?

P.S i have the latest version installed 0.1.1

P.S2 seems that while constructing publisher/consumer factory some configs seems are still needed by them but configs & rawConfigs are sets to null, you should set them at least to an empty object i think beacuse of this for example: https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L164 when config = null https://github.com/skif48/nabbitmq/blob/develop/src/models/publisher.ts#L74

skif48 commented 5 years ago

Hello again. Working on it @marcio199226, thanks for the snippet.

skif48 commented 5 years ago

@marcio199226 fixed in v 0.1.2. You can go ahead and npm install it. New version introduces a slightly different usage of custom setup function, namely - return object interface. The details can be found under the same section with function usage in readme. Let me know if anything goes wrong. :slightly_smiling_face:

skif48 commented 5 years ago

Hey @marcio199226. Is it still an issue for you? If not, I'll close this one. :slightly_smiling_face:

marcio199226 commented 5 years ago

I noticed that when consuming messages if I try to get message's routing key through message.fields.routingKey I'm obtaining queue name instead of message's routing key

skif48 commented 5 years ago

Hello @marcio199226! Did you use custom setup function or you used consumer configs to set up consuming logic?

P.S. I just released version 1.0.0 which changed consumer and publisher configs a little. You can have a look here and here. Also there are examples updated in README.

marcio199226 commented 5 years ago

I have used a custom setup function

  consumerFactory.setCustomSetupFunction(async (connection) => {
    const channel = await connection.createChannel();
    await channel.assertExchange('exchange', 'topic', { durable: true });
    const queueMetadata = await channel.assertQueue('input', {
      durable: true,
      maxPriority: 10
    });

    await channel.bindQueue(queueMetadata.queue, 'exchange', 'account.*');
    await channel.prefetch(1);

    return {channel, prefetch: 1, autoAck: false, queue: 'input'};
  });

And after consuming some message, message.fields.routingKey returns my queue name "input" insteaf of currently messgae routingKey

I have used 0.1.2v

Best regards

skif48 commented 5 years ago

@marcio199226 thanks for the snippet. Working on that.

skif48 commented 5 years ago

@marcio199226 I think I know where the bug is, will publish a fix soon. Thank you very much!

skif48 commented 5 years ago

I'll close this issue and move this conversation in another one: #7 .