narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

Unable to send tombstone message to an AVRO topic #20

Closed hassankishk closed 4 years ago

hassankishk commented 4 years ago

Hi, Is it possible to send a tombstone message (null value) to an AVRO topic using kafka-node-avro? i am able to send a null message with a valid key using a client (conduktor), but i am not able using kafka-node-avro.

When i try to send null (code below) i get an error "TypeError: Cannot read property '[object Array]' of null"

`const kafkaMsg = {
  topic: topic,
  messages: null,
  key: id 
}

this.kafkaAvroConnection.send(kafkaMsg).then(success => {
  this.logger.info('Event %s published on topic %s of the broker!', id, topic)
}, error => {
  this.logger.error(error)
})`

Thanks, Hassan

narcisoguillen commented 4 years ago

Hey Hassan , thanks for using kafka-node-avro , yeah so far this lib expects the message to be a JSON or an Array of JSONs , is it an option for you to send en empty JSON ? like

const kafkaMsg = {
  topic: topic,
  messages: {},
  key: id 
}

this.kafkaAvroConnection.send(kafkaMsg).then(success => {
  this.logger.info('Event %s published on topic %s of the broker!', id, topic)
}, error => {
  this.logger.error(error)
})

if not the issue I assume is on an internal method of the schema lib genKey , you can over write it with

const deleteGetKeyPlugin = function(core){
  core.Schema.prototype.genKey = function(data){
    return '';
  };
};

KafkaAvro
  .use(deleteGetKeyPlugin)
  .init(Settings).then( kafka => {
    kafka.send({
      topic    : topic,
      messages : null,
      key      : id
    });
} , error => {
  console.error(error);
});

you do not have to worry about the key since you are sending it and it will be that at the end.

hassankishk commented 4 years ago

Hi Narciso, First of all, thanks a lot for your answer. I tried both solutions.

narcisoguillen commented 4 years ago

Hey you are right, my guess was wrong. I debugged the issue and found a problem on the errorHook when checking if type .isValid() for null values.

I fixed it from

return reject(`Invalid Field '${path}' type ${type.toString()} : ${payload[path]}`);

to

return reject(`Invalid Field '${path}' type ${type.toString()} : ${any}`);

Issue was on ${payload[path]}

nice catch !

try using 4.2.2 and let me know. ( stay safe )