nodefluent / node-sinek

:tophat: Most advanced high level Node.js Kafka client
MIT License
290 stars 52 forks source link

Is there a way to skip messages conditionally without halting the consumer? #156

Closed giri-jeedigunta closed 4 years ago

giri-jeedigunta commented 4 years ago

Here is what I'm trying to do:

I have a topic recipes(3 partitions) and I have 2 consumer groups sweet and savoury. When I'm working with sweet I don't want to consume / process / commit any messages that belong to savoury I simply want to skip that message and vice versa.

Here is my code:

recipesListener('group-sweet', 'recipes', 'sweet');
recipesListener('group-savoury', 'recipes', 'savoury');
const { NConsumer } = require("sinek");
const { logger } = require("../logger");

const recipesListener = ({ groupId, topicName, groupType }) => {
  const consumerConfig = {
    logger: {
      debug: (msg) => logger.debug(msg),
      info: (msg) => logger.info(msg),
      warn: (msg) => logger.warn(msg),
      error: (msg) => logger.error(msg),
    },
    noptions: {
      "metadata.broker.list": "localhost:9092",
      "group.id": groupId,
      "socket.keepalive.enable": true,
      "queued.min.messages": 1000,
      "queued.max.messages.kbytes": 5000,
      "fetch.message.max.bytes": 524288,
    },
    tconf: {
      "auto.offset.reset": "latest",
    },
  };

  (async () => {
    const consumer = new NConsumer([topicName], consumerConfig);

    consumer.on("error", (error) => logger.error(error));
    consumer.on("ready", () =>
      logger.info(`Consumer ready to consume ${topicName} for ${groupId}`)
    );

    await consumer.connect();

    consumer.consume(
      async (messages, callback) => {
        /*
         *   messages are produced with headers that can have either sweet or savoury
         */
        if (messages.headers !== groupType) {
          return;
        }

        callback();
      },
      true,
      false,
      { noBatchCommits: true }
    );
  })().catch(logger.error);
};

module.exports = {
  recipesListener,
};

Currently the below lines halts the consumer completely. Is there a workaround for this? Please let me know.

        /*
         *   messages are produced with headers that can have either sweet or savoury
         */
        if (messages.headers !== groupType) {
          return;
        }

I'm using

Node v12.8.2
"node-rdkafka": "^2.9.1",
"sinek": "^9.1.0",

I'm a Kafka noob. Please let me know if any more details are needed.

giri-jeedigunta commented 4 years ago

I've figured this out. Hence closing.