noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

x-cache-ttl ignored #24

Closed citosid closed 5 years ago

citosid commented 5 years ago

We have a node application where we want to use this module, however we cannot make it work with TTL. It de duplicate the headers, but does not matter what TTL we set, it ignores it.

Here is our code:

import { connect, Connection } from 'amqplib';
import { createHmac } from 'crypto';
import { Inject } from '@nestjs/common';
import { Observable } from 'rxjs';
import { sleep } from 'shared';

export class Queue {

  private static connection: Connection;

  get connect(): Promise<void> {
    return new Promise<void>(async resolve => {
      while (!Queue.connection) {
        await sleep(500);
      }

      resolve();
    });
  }

  constructor(
  ) {
    connect(`amqp://${process.env.RABBITMQ_USERNAME}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}`).then((conn) => {
      Queue.connection = conn;
    }).catch(error => {
    });
  }

  public async publish(queue: string, message: any, deDupHeader?: string | undefined): Promise<void> {
    const signature = createHmac('sha256', 'mySecretKey')
      .update(JSON.stringify(message))
      .digest('base64');

    const messageObj: Message = {
      message: message,
      signature,
    };

    return new Promise<void>(resolve => {
      Queue.connection.createChannel().then(async ch => {
        const exchangeName = `exchange_${queue}`;
        const exchange = await ch.assertExchange(
          exchangeName,
          'x-message-deduplication',
          {
            arguments: {
              'x-cache-size': '100',
              'x-cache-ttl': '1000',
            },
          },
        );
        await ch.assertQueue(queue, {
          durable: false,
        });
        await ch.bindQueue(queue, exchangeName, '*');
        await ch.publish(exchangeName, '', Buffer.from(JSON.stringify(messageObj)), {
          headers: {
            'x-deduplication-header': deDupHeader,
            'x-cache-ttl': 1000,
          },
        });
        // ch.sendToQueue(
        //   queue,
        //   Buffer.from(JSON.stringify(messageObj)),
        //   {
        //     persistent: true,
        //     headers: {
        //       'x-deduplication-header': deDupHeader,
        //       'x-cache-ttl': 5000,
        //     },
        //   },
        // );
        resolve();
      });
    });
  }

  public subscribe(queue: string): Observable<any> {
    return new Observable<any>(observer => {
      Queue.connection.createChannel().then(ch => {
        ch.deleteQueue(queue);
        ch.assertQueue(queue, { durable: false, arguments: { 'x-message-deduplication': true, 'x-cache-ttl': 5000 } });
        ch.consume(queue, message => {
          const content = message!.content.toString();
          const messageObject: Message = JSON.parse(content);
          const calculatedSignatureFromMessage = createHmac('sha256', 'mySecretKey')
            .update(JSON.stringify(messageObject.message))
            .digest('base64');

          if (messageObject.signature !== calculatedSignatureFromMessage) {
            // We do not forward this message, because the
            // signature does not match
            return;
          }

          observer.next(messageObject);
        });
      });
    });
  }
}

interface Message {
  message: any;
  signature: string;
}
noxdafox commented 5 years ago

Hello,

I am not familiar with Javascript but for what I understand you are setting de-duplication both on the exchange and on the queue.

Exchange and queue level de-duplication work differently and the difference is explained in the README. Only the exchange de-duplication allows to set TTL, the queue de-duplicates based on its content.

If you de-duplicate on both queue and exchange you will end with difficult to predict behaviour.

noxdafox commented 5 years ago

Hello,

Is this issue still ongoing?

citosid commented 5 years ago

Sorry for the delay. In the end we were not able to make it work. Will get the code we have and let you know the outcome.

Thanks for the help!

noxdafox commented 5 years ago

Closing due to inactivity. Feel free to re-open if assistance is still needed.