SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

Cannot consume messages from beginning when put consume.on('message,callback) inside another callback #1301

Open leanhvu1989 opened 5 years ago

leanhvu1989 commented 5 years ago

Questions?

I push messages from kafka consumer to mongodb.

Bug Report

Environment

For specific cases also provide

Include Sample Code to reproduce behavior

// include code here
const kafka = require('kafka-node');
const mongo = require('mongodb');
const assert = require('assert');

const { Consumer, Offset, KafkaClient } = kafka;
const { MongoClient } = mongo;

const topic = 'testprocessing';

const url = 'mongodb://localhost:27017';
const dbName = 'test_kafka_processing';
let mongodb;

const client = new KafkaClient({kafkaHost: 'localhost:9092'});

const topics = [{
    topic: 'testprocessing',
    offset: 0,
    partition: 0
}];

const options = {
    autoCommit: false,
    fetchMaxWaitMs: 1000,
    fetchMaxBytes: 1024 * 1024,
    fromOffset: true
};

MongoClient.connect(url, function(err, client) {
    assert.equal(null, err);
    mongodb = client.db(dbName);

    consumer.on('message', (message) => {
        const collection = mongodb.collection('transaction');
        // Insert some documents
        collection.insertOne(message.value,
            function(err, result) {
                assert.equal(err, null);
                console.log("Inserted message into the collection");
                callback(result);
        });
    });

    consumer.on('error', (err) => {
        console.log('error', err);
    });
});

const consumer = new Consumer(client, topics, options);

Include output with Debug turned on

Thanks for your contribution!

ghost commented 4 years ago

Hi, @leanhvu1989 couple observations: