narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

decoding of types #13

Closed paulcull closed 4 years ago

paulcull commented 4 years ago

Hi,

I've got a schema that has a bytes type (logicalType: Decimal) in the schema, but when I'm reading the message its not decoding that buffer. I can't seem to find anything in the docs.

Is there a way to apply the schema to the message including these types ?

Schema `

{
"type": "record",
"name": "Payment",
"namespace": "com.landoop.data.generator.domain.payments",
"fields": [
    {
    "name": "id",
    "type": "string"
    },
    {
    "name": "time",
    "type": "string"
    },
    {
    "name": "amount",
    "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 38,
        "scale": 18
    }
    },
    {
    "name": "currency",
    "type": "string"
    },
    {
    "name": "creditCardId",
    "type": "string"
    },
    {
    "name": "merchantId",
    "type": "long"
    }
]
}

`

Response `

{
topic: 'cc_payments',
value: Payment {
    id: 'txn1576511768154',
    time: '2019-12-16T15:56:08.154Z',
    amount: <Buffer 00 f8 7f 15 de 31 27 db 00 00>,
    currency: 'USD',
    creditCardId: '5390713494347532',
    merchantId: 51
},
offset: 76933,
partition: 0,
highWaterOffset: 76934,
key: '5390713494347532',
timestamp: 2019-12-16T15:56:08.154Z
}

`

Code `

const KafkaAvro = require('kafka-node-avro');

const Settings = {
    "kafka": {
        "kafkaHost": "127.0.0.1:9092"
    },
    "schema": {
        "registry": "http://127.0.0.1:8081"
    }
};

KafkaAvro.init(Settings).then(kafka => {

    console.log(' HERE');
    let consumer = kafka.addConsumer('cc_payments', { kafkaHost: "127.0.0.1:9092", groupId: "TestGroupOne", fromOffset: "latest" });

    consumer.on('error', error => {
        console.log(error);
    });

    consumer.on('message', message => {
        console.log(message);
    });

}, error => {
    console.log(error);
});

`

narcisoguillen commented 4 years ago

Hey Paul, thanks for using kafka-node-avro, this package is using fromBuffer mechanism from avsc on the decode

I made some research but had no luck on why avsc is not processing this field, you can overwrite the default encoder by

const KafkaAvro = require('kafka-node-avro');

const Settings = {
  "kafka": {
    "kafkaHost": "127.0.0.1:9092"
  },
  "schema": {
    "registry": "http://127.0.0.1:8081"
    "topics" : [{ "name" : "cc_payments" }]
  }
};

KafkaAvro.init(Settings).then( kafka => {
  kafka.schemas.getByName('cc_payments').then( schema => {

    schema.decode = function(message){
      return schema.parser.fromBuffer(message.slice(5)); // Default <-- Overwrite
    };

    let consumer = kafka.addConsumer('cc_payments', { kafkaHost: "127.0.0.1:9092", groupId: "TestGroupOne", fromOffset: "latest" });

    consumer.on('message', message => {
      console.log(message);
    });

    consumer.on('error', error => {
      console.log(error);
    });

  }, error => {
    console.log(error);
  });

}, error => {
  console.log(error);
});

on the

    schema.decode = function(message){
      return schema.parser.fromBuffer(message.slice(5)); // Default <-- Overwrite
    };

message : Raw AVRO encoded event

schema : { id : Number, name : String, version : Number, key_fields : Arrary, definition : String, // raw responmse from the schema registry. parser : avro.Type.forSchema }

NOTE: Just make sure to return the value.

Example :

  schema.decode = function(message){
    const decoded = schema.parser.fromBuffer(message.slice(5));
    decoded.amount = parseInt(decoded.amount);
    return decoded
  };