cody-greene / node-rabbitmq-client

RabbitMQ (0-9-1) client library with auto-reconnect & zero dependencies
MIT License
130 stars 9 forks source link

problem to set correlationId #37

Closed ghasemsabery closed 11 months ago

ghasemsabery commented 11 months ago

this my code

async sendRPCMessage(queue: string, messageData: string) {
        const rabbit = new Connection(rabbitmq)
        const rpcClient = rabbit.createRPCClient({ confirm: true })
        const res = await rpcClient.send({ routingKey: queue, correlationId: '123', expiration: '123' }, 'messageData');
        console.log('response:', res.body)
        await rpcClient.close()
        await rabbit.close()
    }

and here is its output

{
  consumerTag: 'amq.ctag--wGfKOLXxFHts5XzcobW_w',
  deliveryTag: 1,
  redelivered: false,
  exchange: '',
  routingKey: 'tmpTest',
  contentType: 'text/plain',
  deliveryMode: 1,
  correlationId: '1',
  replyTo: 'amq.rabbitmq.reply-to.g1h2AA5yZXBseUA1MDAzMzQxNAAAOUkAAAAmZTzZFA==.uEd6YdebP5/BEhi5KtJGaQ==',
  expiration: '30000',
  timestamp: 1701853044,
  durable: false,
  body: 'messageData'
}

as you can see correlationId fild didnt change and it has defult value

cody-greene commented 11 months ago

Do you have any particular reason to set your own correlation ID? The rpc client auto-increments this value and uses it to return a matching response

-------- Original Message -------- On Dec 6, 2023, 1:11 AM, ghasem sabery wrote:

this my code

async sendRPCMessage(queue: string, messageData: string) { const rabbit = new Connection(rabbitmq) const rpcClient = rabbit.createRPCClient({ confirm: true }) const res = await rpcClient.send({ routingKey: queue, correlationId: '123', expiration: '123' }, 'messageData'); console.log('response:', res.body) await rpcClient.close() await rabbit.close() }

and here is its output

{ consumerTag: 'amq.ctag--wGfKOLXxFHts5XzcobW_w', deliveryTag: 1, redelivered: false, exchange: '', routingKey: 'tmpTest', contentType: 'text/plain', deliveryMode: 1, correlationId: '1', replyTo: 'amq.rabbitmq.reply-to.g1h2AA5yZXBseUA1MDAzMzQxNAAAOUkAAAAmZTzZFA==.uEd6YdebP5/BEhi5KtJGaQ==', expiration: '30000', timestamp: 1701853044, durable: false, body: 'messageData' }

as you can see correlationId fild didnt change and it has defult value

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

ghasemsabery commented 11 months ago

Thank you for your response: To prevent specific situations, such as restarting the program for any reason, and to avoid registering duplicate correlation IDs in the queue, if we increase the timeout, we may encounter issues with long responses.

cody-greene commented 11 months ago

There seems to be a misunderstanding here. The correlation id is only useful to the rpc client. The other end, the server queue, should have no need to even look at this value, beyond echoing it back with the reply.

A duplicate correlationId in the queue should therefor be no problem at all. In fact, it should be expected. Messages in this situation are actually uniquely identified by the combination of the "replyTo" property plus the "correlationId" property. The client creates a new anonymous reply queue, with a randomly generated name, whenever it (re)starts. This can be seen in the "replyTo" property. This means that in-flight replies are discarded if the client restarts or loses connection. And thanks to the auto-incrementing correlationId, late replies are discarded as well.

You should expect your "server queue" to sometimes perform wasted work in these cases. You can't know if the reply will actually be seen. If you're actually experiencing a bug in any of these cases then please create a new issue with an example so I can replicate it.