tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Consumer showing values as buffer #259

Closed jafri closed 5 years ago

jafri commented 5 years ago

Producer:

const { Kafka, CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

// ... skipping initialization, etc
            this.producer.send({
              topic: 'traces',
              idempotent: true,
              messages: [
                ...messages.map(message => ({ value: JSON.stringify(message) }))
              ],
              compression: CompressionTypes.Snappy
           })

Consumer:

const { Kafka, CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

    this.abisConsumer.run({
      eachMessage: async ({ /*topic, partition,*/ message }) => {
        console.log({
          key: message.key,
          value: message.value
        })
      },
    })  

Result:

{ key: null,
  value:
   <Buffer 7b 22 70 72 65 73 65 6e 74 22 3a 74 72 75 65 2c 22 62 6c 6f 63 6b 5f 6e 75 6d 22 3a 31 30 38 39 2c 22 61 63 63 6f 75 6e 74 22 3a 22 65 6f 73 69 6f 22 ... > }

I get the correct results using another tool like kafkacat

jafri commented 5 years ago

Using String(message.value) works, but should that be necessary?

tulios commented 5 years ago

Hi @jafri, Kafka can carry any payload so we can't assume that you are publishing strings, for example, you could be using Avro which has a binary payload. Your messages will always receive a Buffer on key and value; you can use value.toString() to convert it to a string, so using your example:

this.abisConsumer.run({
      eachMessage: async ({ /*topic, partition,*/ message }) => {
        console.log({
          key: message.key,
          value: message.value.toString() // <- toString here
        })
      },
    })