Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.12k stars 396 forks source link

Best consume method for handling back-pressure #877

Open Crispy1975 opened 3 years ago

Crispy1975 commented 3 years ago

Hi. I've been using node-rdkafka for a little while and overall I am very pleased with how it works, great lib! I do have a question however that I am not 100% on the answer to. I have a consumer process that needs to be able to handle back-pressure effectively as I am doing ETL into a slower database cluster, so I don't want the consumer to be overwhelmed and go OOM. However, I am getting OOM issues periodically... I was looking through the library code on the JS side and saw this comment: https://github.com/Blizzard/node-rdkafka/blob/master/lib/kafka-consumer.js#L382

This suggests that using the consume() method is going to result in OOMs in situations like mine. I have set librdkafka settings such as queued.min.messages so it only grabs a smaller number of messages in the background. I've also built an internal queue in my process with a pause/resume mechanism to allow for more control. For the most part previous OOMs have much reduced, however as mentioned I still do get them. Below is a skeleton version of what I have running, comments on any issues or reasons for the OOM would be greatly appreciated.

let isPaused = false;
let consumerRunCheck: ReturnType<typeof setTimeout>;

try {
    // Kafka consumer tracking
    let processingQueue: Array<Kafka.Message> = []; // Internal message queue
    const queueStatus = new EventEmitter;    // Init internal batch queue emitter
    let lastBatchTime: number = Date.now();

    const onRebalance = async function onRebalance(err: Kafka.LibrdKafkaError, assignments: Array<Kafka.Assignment>): Promise<void> {
        try {
            if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
                await consumer.assign(assignments);

            } else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
                processingQueue = [];
                await consumer.unassign();

            } else {
                console.error(`Re-balance error: ${err.message}` });
            }

        } catch (err) {
            handleErrors(err);
        }
    }

    // Create the Kafka consumer
    const consumer: Kafka.KafkaConsumer = new Kafka.KafkaConsumer({
        ...consumerConfig,
        'rebalance_cb': onRebalance
    }, topicConfig);

    // Connect to the Kafka broker(s)
    consumer.connect();

    consumer.on('ready', () => {
        consumer.subscribe(transformConfig.topics);
        consumer.setDefaultConsumeTimeout(transformConfig.defaultConsumeTimeout);
        consumer.consume();
    });

    // Data event detected, push to the internal processing queue
    consumer.on('data', (data: Kafka.Message) => {
        processingQueue.push(data);
        // Check to see if internal queue is full and trigger batch processing
        if (isPaused === false && processingQueue.length >= transformConfig.internalQueueMax) {
            consumer.pause(consumer.assignments());
            isPaused = true;
            queueStatus.emit('batchReady');
        }
    });

    // Batch read for processing event detected
    queueStatus.on('batchReady', () => {
        // Process the batch of messages
        return messageProcessing(processingQueue).catch((err) => {
            handleErrors(err);

        }).finally(() => {
            // Free the internal queue memory for the next batch
            processingQueue.length = 0;
            // Set the last batch time
            lastBatchTime = Date.now();
            // Commit offsets
            consumer.commit();
            // Resume consuming messages
            consumer.resume(consumer.assignments());
            isPaused = false;
        });
    });

    // Check to see if we need to restart things
    consumerRunCheck = setInterval(() => {
        console.log(`Internal processing queue has ${processingQueue.length} messages waiting (isPaused: ${isPaused}).`);
        // Calculate the last time we saw a batch
        const lastBatchDiff: number = Math.floor((Date.now() - lastBatchTime)/1000);
        if (isPaused && lastBatchDiff >= 60) {
            console.log(`Consumer appears to be stuck, unpausing.`);
            // Resume consuming messages
            consumer.resume(consumer.assignments());
            isPaused = false;
        }
    }, 60000);

} catch (err) {
    console.error(err.message);
    process.exit(1);
}

The consumer settings are as follows:

export const consumerConfig: ConsumerGlobalConfig = {
    'client.id': `my-client`,
    'group.instance.id': `my-client-instance-${Date.now()}`,
    'metadata.broker.list': brokers,
    'group.id': `my-events`,
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 3000,
    'enable.auto.commit': false,
    'queued.min.messages': 10,
    'queued.max.messages.kbytes': 65536,
    'fetch.message.max.bytes': 1048576,
    'fetch.max.bytes': 1048576
};

I am considering switching from the style listed above to making use of the callback on the consume() method but before that it would be interesting to see if there is something obvious I am doing (or not) with the above code.

syahiaoui commented 3 years ago

Hi,

In our company wa are using the consumer(non-following mode) and work fine to handle back-pressure (with the async module or with out it).

  consumerOnReady(consumer) {
          logger.debug(`[Consumer] - consumerEvents -  consumer ready.`);
          logger.info(`[Consumer] - consumerEvents - Subscribing to Topic: '${kafka.topic.split(",")}'`);
          consumer.subscribe(kafka.topic.split(","));
          consumer.consume(this.maxPollRecords, this.onData);
          this.start(consumer);
          consumerInstance = consumer;
      }
    onDataCallback(consumer, err, msg) {
        try {
            if (err) {
                if (err.message === failuresMessage.CONSUMER_NOT_CONNECTED) process.exit(1);
                else logger.error(`[Consumer] - onDataCallback - error: ${err}`);
            }
            if (msg && msg.length) {
                logger.debug(`[Consumer] - onDataCallback - poll returns: ${msg.length} Records`);
                msg.forEach(record => this.notifyStartProcessing(record));
                this.q.push(msg);
            }
            if (this.q.length() > queue.asyncMaxQueueSize) {
                consumer.pause(consumer.assignments());
                this.paused = true;
                logger.debug(`[Consumer] - onDataCallback - consumer paused`);
            } else {
                if (this.paused) {
                    logger.debug(`[Consumer] - onDataCallback - consumer resumed`);
                    this.paused = false;
                    consumer.resume(consumer.assignments());
                }
                consumer.consume(this.maxPollRecords, this.onData);
            }
        } catch (error) {
            logger.error(`[Consumer] - onDataCallback - handle error: ${error}`);
        }
    }
    queueOnDrain(consumer) {
        logger.debug(`[Consumer] - consumerEvents - queue drain`);
        logger.debug(`[Consumer] - consumerEvents - is paused ${this.paused}`);
        if (this.paused) {
            this.paused = false;
            consumer.resume(consumer.assignments());
            logger.debug(`[Consumer] - consumerEvents - consumer resumed`);
        }
        consumer.consume(this.maxPollRecords, this.onData);
    }

    /**
     * Kafka consumser event listner, push on the queue all event received from kafka
     * The queue is used to handle backpressure, if the length of the queue is greatter than the asyncMaxQueueSize the consumer is paused,
     * will be resumed only when the queue is drained (all events are processed successfully or errored).
     */
    consumerEvents() {
        const consumer = new rdKafka.KafkaConsumer(ConsumerConfig.globalConfig(this), ConsumerConfig.topicConfig());
        consumer.setDefaultConsumeTimeout(kafka.consumerDefaultTimeout);
        consumer.connect({ timeout: kafka.connectionTimeout }, (err) => this.consumerOnConnect(err));
        consumer.on('ready', () => this.consumerOnReady(consumer));
        consumer.on('warning', warn => logger.warn(`[Consumer] - consumerEvents - warning ${JSON.stringify(warn)}`));
        consumer.on('event.log', log => logger.warn(`[Consumer] - consumerEvents - event.log ${JSON.stringify(log)}`)); //logging debug messages, if debug is enabled
        consumer.on('event.error', err => logger.error(`[Consumer] - consumerEvents - event.error ${JSON.stringify(err)}`)); //logging all errors
        consumer.on('disconnected', (arg) => logger.info(`[Consumer] - consumerEvents - consumer disconnected:  ${JSON.stringify(arg)}`));
        this.onData = (err, msg) => this.onDataCallback(consumer, err, msg);
        this.q.drain(() => this.queueOnDrain(consumer));
        this.q.error((err, task) => logger.error(`[Consumer] - consumerEvents - async queue error: ${err}`, { task, stackTrace: err.stack }));
    }
Crispy1975 commented 3 years ago

Thanks for the example @syahiaoui - I suspect I have a leak somewhere else after the consumer code... I will dig into that.

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

sathyarajagopal commented 3 years ago

@syahiaoui I think if you start using streams api instead of the traditional consumer, you don't need to handle this back pressure manually. Please correct me if I am wrong.

syahiaoui commented 3 years ago

@sathyarajagopal The consumer's stream API extends the native class of Readable, but in the past, there was a problem of not stopping reading messages when the internal buffer has reached the threshold of highWaterMark. (I don't know if this has been fixed)

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

iklotzko commented 2 years ago

@syahiaoui do you have the full code to the great example you provided of the consumer(non-following mode)? Thanks again for this, it's very elegant!

iklotzko commented 2 years ago

@syahiaoui I was able to get it working, thanks, it's a very clear solution for our problem!