SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

Problem with Producer/Consumer - why doesn't it work #252

Open petergdoyle opened 9 years ago

petergdoyle commented 9 years ago

this code works (reads from the consumer and sends to the new topic using producer)


var argv = require('optimist')
    .usage('Usage: $0 --topic=[kafka-topic] --partition=[kafka-partition] --attrs=[kafka-attributes] --autoCommit=[true|false]')
    .demand(['topic'])
    .argv;

var consumer_topic = argv.topic;
var consumer_partition = argv.partition || 0;
var consumer_attrs = argv.attrs || 0;
var consumer_autoCommit = argv.autoCommit || false;

var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('kafka_zk_0:2181'),
consumer = new Consumer(
    client,
    [
        { topic: consumer_topic, partition: consumer_partition }
    ],
    {
        autoCommit: consumer_autoCommit
    }
);

consumer.on('message', function (message) {
  var v = message.value
  .replace(/[^\w\s]|_/g, "")
  .replace(/\s+/g, " ")
  .split(' ');

  var producer = new kafka.Producer(new kafka.Client('kafka_zk_0:2181'),{ requireAcks: 1 });
  producer_partition = 0;
  producer_attrs = 0;
  producer_message = v.length.toString()+message.value;
  producer_topic = (v.length % 2 ) ? "odd-word-count" : "even-word-count";

  producer.on('ready', function () {
    console.log("emitting to ",producer_topic);
    producer.send([
        { topic: producer_topic, partition: producer_partition, messages: [producer_message], attributes: producer_attrs }
      ], function (err, result) {
          console.log(err || result);
      });
  });
});

This code doesn't. it reads from the consumer and never sends with the producer.


var argv = require('optimist')
    .usage('Usage: $0 --topic=[kafka-topic] --partition=[kafka-partition] --attrs=[kafka-attributes] --autoCommit=[true|false]')
    .demand(['topic'])
    .argv;

var consumer_topic = argv.topic;
var consumer_partition = argv.partition || 0;
var consumer_attrs = argv.attrs || 0;
var consumer_autoCommit = argv.autoCommit || false;

var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('kafka_zk_0:2181'),
consumer = new Consumer(
    client,
    [
        { topic: consumer_topic, partition: consumer_partition }
    ],
    {
        autoCommit: consumer_autoCommit
    }
);

var producer = new kafka.Producer(new kafka.Client('kafka_zk_0:2181'),{ requireAcks: 1 });
producer_partition = 0;
producer_attrs = 0;

consumer.on('message', function (message) {
  var v = message.value
  .replace(/[^\w\s]|_/g, "")
  .replace(/\s+/g, " ")
  .split(' ');

  producer_message = v.length.toString()+message.value;
  producer_topic = (v.length % 2 ) ? "odd-word-count" : "even-word-count";

  producer.on('ready', function () {
    console.log("emitting to ",producer_topic);
    producer.send([
        { topic: producer_topic, partition: producer_partition, messages: [producer_message], attributes: producer_attrs }
      ], function (err, result) {
          console.log(err || result);
      });
  });
});

I should not have to create a new connection every time I need to send a message every consumer.on('message')!!!

petergdoyle commented 9 years ago

... And should it not be possible to use the same kaka.Client for both producer and consumer? the second approach still uses multiple connects rather than one. It is a better than a connection per send and might be good for high-volume traffic (what i need this to scale to). ... Also still not clear to me whether to use the regular Consumer and Producer or the HighLevel versions of those? Thanks !