SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.67k stars 628 forks source link

How to get last offset Number directly or to filter the offsets #225

Open HafsaAsif opened 9 years ago

HafsaAsif commented 9 years ago

Hi, I am using a simple consumer/producer in Kafka NodeJS. My producer is sending messages which I am easily getting in consumer. Producer and Consumer code is below. In Consumer, I ws expecting that offset.fetch() gives me all offset IDs of today, but it doesnot. KIndly guide me that how I can get results from this method and also mention a method that directly gives the last offset number in the topic of any partition. I also want to know that how can I filter offsets in the coming streaming. e.g: If I want to get only last 20 messages in my consumer?

My producer is: var kafka = require('kafka-node'); var Producer = kafka.Producer; var Client = kafka.Client; var client = new Client('localhost:2181'); var producer = new Producer(client); producer.on('ready', function () { producer.send([ { topic: 'test', key:'key1', partition: 0, messages: ['banana','carrot','lemon','apple','melon','kiwi','mango','avacado'], attributes: 0} ], function (err, result) { console.log(err || result); process.exit(); }); });

My Consumer is: var kafka = require('kafka-node'); var Consumer = kafka.Consumer; var client = new kafka.Client('localhost:2181'); var offset = new kafka.Offset(client); offset.fetch([ { topic: 'test' } ], function (err, data) { console.log(data);

}); var consumer = new Consumer( client, [ { topic: 'test', partition: 0} ],

    {  autoCommit: false, autoCommitIntervalMs: 5000,  fetchMaxWaitMs: 100, fromOffset: true, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10
    }
);

consumer.on('message', function (message) { console.log(message); });

anandpathak commented 9 years ago

when you are fetching offset you can mention the time from when you want to fetch offset as well as you can get how many offset you want.

offset.fetch([
        { topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
    ], function (err, data) {
        // data 
        // { 't': { '0': [999] } } 
    });

here you can see the time is mentioned is current so by changing it to 20 min before you can get the offset of the message which came 20 min before. now you can pass this offset value to consumer API and can get messages from 20 min before . keep it in mind that the JSON returned is contain partition as key so reading that would difficult so store partition number in a variable as string and then pass it like _object[t][partitionnumber][0]