nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

Calling .consume() is not required in streaming mode. #91

Closed gilesbradshaw closed 5 years ago

gilesbradshaw commented 5 years ago

Any ideas?

Calling .consume() is not required in streaming mode. Error: Calling .consume() is not required in streaming mode. at NConsumer.consume (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/sinek/lib/librdkafka/NConsumer.js:496:29) at consumer.connect.then (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/kafka-streams/lib/client/NativeKafkaClient.js:112:35) at processImmediate (timers.js:632:19) From previous event: at NativeKafkaClient.once (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/kafka-streams/lib/client/NativeKafkaClient.js:103:69) at Object.onceWrapper (events.js:276:13) at NativeKafkaClient.emit (events.js:188:13) at NativeKafkaClient.start (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/kafka-streams/lib/client/NativeKafkaClient.js:121:19) at KStream._start (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/kafka-streams/lib/dsl/KStream.js:106:20) at KStream.start (/Users/giles/Projects/msd-2-0/src/apps/api/node_modules/kafka-streams/lib/dsl/KStream.js:61:21)

krystianity commented 5 years ago

This is very likely related to the kafka config that you haved passed to KStreams, can you post it here please.

gilesbradshaw commented 5 years ago

Hi, thanks for your reply - here it is..

{
    "noptions": {
      "metadata.broker.list": "192.168.0.51:9092, 192.168.0.51:9093, 192.168.0.51:9094",
      "group.id": "API",
      "client.id": "api-2",
      "event_cb": true,
      "api.version.request": true,
      "compression.codec": "snappy",

      "socket.keepalive.enable": true,
      "socket.blocking.max.ms": 100,

      "enable.auto.commit": true,
      "auto.commit.interval.ms": 1000,

      "heartbeat.interval.ms": 1000,
      "retry.backoff.ms": 250,

      "fetch.min.bytes": 100,
      "fetch.message.max.bytes": 2097152,
      "queued.min.messages": 100,

      "fetch.error.backoff.ms": 100,
      "queued.max.messages.kbytes": 50,

      "fetch.wait.max.ms": 1000,
      "queue.buffering.max.ms": 1000,

      "batch.num.messages": 10000
    }
  }
gilesbradshaw commented 5 years ago

it seems to me that this is a bug - when you connect with asStream = true and call consume it will raise this error

             if (withBackPressure) {
                    this.consumer.consume((message, done) => {
                        super.emit("message", message);
                        done();
                    }, false, false, this.batchOptions).catch(e => kafkaErrorCallback(e));
                } else {
                    this.consumer.consume().catch(e => kafkaErrorCallback(e));
                    this.consumer.on("message", message => {
                        super.emit("message", message);
                    });
                }

However also if I call it with backpressure I get

Warning: a promise was created in a handler at /home/giles/project/msd-2-0/src/packages/msd-kafka/node_modules/kafka-streams/lib/client/NativeKafkaClient.js:108:35 but was not returned from it, see http://goo.gl/rRqMUw

zzswang commented 5 years ago

same warning

(node:67712) Warning: a promise was created in a handler at /Users/zzs/Workspace/src/github.com/36node/bus-messenger-js/node_modules/kafka-streams/lib/client/NativeKafkaClient.js:107:35 but was not returned from it, see http://goo.gl/rRqMUw
    at new Promise (/Users/zzs/Workspace/src/github.com/36node/bus-messenger-js/node_modules/bluebird/js/release/promise.js:79:10)
krystianity commented 5 years ago

merged and released