narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

addConsumer with 'simple:true' #10

Closed vadymrybak closed 4 years ago

vadymrybak commented 4 years ago

I'm trying to addConsumer and providing topic name and param simple: true to ignore schema.

const consumer = kafka.addConsumer("nodejs-topic", {simple: true});

But still getting error

RangeError: Index out of range
    at checkOffset (buffer.js:977:11)
    at Buffer.readUInt32BE (buffer.js:1051:5)
    at Consumer.parse (C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\kafka-node-avro\lib\consumer.js:24:36)
    at emitOne (events.js:116:13)
    at ConsumerGroup.emit (events.js:211:7)
    at C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\kafka-node\lib\baseClient.js:68:16
    at decodeMessageSet (C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\kafka-node\lib\protocol\protocol.js:308:15)
    at enqueue (C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\kafka-node\lib\protocol\protocol.js:407:15)
    at C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\async\dist\async.js:3880:24
    at replenish (C:\Users\vadymr\Documents\provisioning\node-inventory\node_modules\async\dist\async.js:1011:17)

Seems like although simple=true there still peace of code that is trying to get schemaId in consumer.js

let schemaId = message.value.readUInt32BE(1);

Please advice.

narcisoguillen commented 4 years ago

Hi thanks for using kafka-node-avro ... sorry for the delay had a lot of work. When you send the event is the producer simple as well ? ... Here some code I tried and worked, if this is a bug I would like to reproduce and fix. Thanks

KafkaAvro.init(settings).then( kafka => {
  const consumer = kafka.addConsumer("nodejs-topic", {simple : true});

  consumer.on('error', error =>{
    console.log(error);
  });

  consumer.on('message', message => {
    console.log(message);
  });

  setInterval(function(){
    kafka.send({
      simple   : true,
      topic    : 'nodejs-topic',
      messages : {
        foo : 'hello',
        bar : 'world'
      }
    }).then( success => {
      console.log(`Message sent : ${JSON.stringify(success)}`)
    }, error => {
      console.log(error);
    });
  }, 1000)

} , error => {
  console.log(error);
});
vadymrybak commented 4 years ago

Sorry, it's actually my bad. I was submitting messages to Kafka as a regular string like "test". As soon as I started submitting string in JSON format like {msg:'test'} then everything started to work. We can close this one. Thanks for your reply!