narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

kafka-node-avro

NPM

Node.js bindings for kafka-node with avsc schema serialization.

This library combines kafka-node and avsc to produce and consume validated serialized messages

Requirements

kafka-node is a peer dependency, make sure to install it. Tested on kafka-node 5.0.0

 npm install kafka-node

Install

 npm install kafka-node-avro

Test

 npm test

Options

See sample options.

API

init

This package will not fullfill the promise if is not able to :

const KafkaAvro = require('kafka-node-avro');
const Settings  = {
  "kafka" : {
    "kafkaHost" : "localhost:9092"
  },
  "schema": {
    "registry" : "http://schemaregistry.example.com:8081"
  }
};

KafkaAvro.init(Settings).then( kafka => {
  // ready to use
} , error => {
  // something wrong happen
});

use

Ability to build custom plugins, this method will allow to modify existing core implementations by direct overwrites or to build new mechanisms.

A Plugin must be a function, this function will get as argument the core of kafka-node-avro

const myCustomPlugin1 = function(core){
  // Overwrite : default registry uri builder for allVersions
  core.Registry.endpoints.allVersions = function(id, name, version){
    console.log('Look ma !, fetching all versions');
    return `subjects/${name}-value/versions`;
  };
};

const myCustomPlugin2 = function(core){
  // Overwrite : default consumer parser
  core.Consumer.prototype.parse = function(message){
    console.log('Workign on this -> ', message);
    return this.emit('message', message); // emit to consumers
  };
};

const myCustomPlugin3 = function(core){
  // Create new mechanism
  core.Mechanisms.myFunction = function(){
    // logic
  };
};

Plugging in

KafkaAvro
  .use(myCustomPlugin1) // change how to build uri to fetch a schema by all versions
  .use(myCustomPlugin2) // change how to parse an incommig message
  .use(myCustomPlugin3) // add a new `myFunction`
  .init(Settings).then( kafka => {
    kafka.myFunction(); // new method by plugin
} , error => {
  // ..
});

schemas

Fetch schemas from the schema registry, this package will fetch the schema from the shcema regitry based on the initial settings.

Once schema was fetched from the registry it will keep it on memory to be re used.

Schema format

{
    id : Number,
    name : String,
    version : Number,
    key_fields : Arrary,
    definition : String, // raw responmse from the schema registry.
    parser : avro.Type.forSchema
}

schemas.getById

Get an avro schema by id

KafkaAvro.init(Settings).then( kafka => {
  kafka.schemas.getById(1).then( schema => {
    // we got the schema from the registry by the id
  } , error => {
    // something wrong happen
  });
} , error => {
  // something wrong happen
});

schemas.getByName

Get an avro schema by name

KafkaAvro.init(Settings).then( kafka => {
  kafka.schemas.getByName('my.cool.topic').then( schema => {
    // we got the schema from the registry by the name
  } , error => {
    // something wrong happen
  });
} , error => {
  // something wrong happen
});

send(\<message>)

This package will auto encode the message using the avro schema, if the schema was not provided on the initial settings, it will fetch it against the schema registry and use it from there on.

Message Format

If key_fields where provided when building the package, they will be used to send the messages on that key, on this example the key will be hello/world

KafkaAvro.init(Settings).then( kafka => {
  kafka.send({
    topic    : 'my.cool.topic',
    messages : {
      foo : 'hello',
      bar : 'world'
    }
  }).then( success => {
    // Message was sent encoded with Avro Schema
  }, error => {
    // Something wrong happen
  });
} , error => {
  // something wrong happen
});

If an invalid payload was provided for the AVRO Schema, the error will look like : Invalid Field 'FIELD' type "TYPE" : VALUE

addProducer([options], [customPartitioner])

kafka-node-avro has a global producer with default kafka-node settings for the HighLevelProducer, this mechanism will allow to create HighLevelProducers on demand with the ability to set options and customPartitioner. here for more info.

When creating a new producer, send mechanism is the same as the global producer, this send will auto encode the message using the avro schema, if the schema was not provided on the initial settings, it will fetch it against the schema registry and use it from there on.

Message Format

KafkaAvro.init(Settings).then( kafka => {
  const producer = kafka.addProducer();

  producer.send({
    topic    : 'my.cool.topic',
    messages : {
      foo : 'hello',
      bar : 'world'
    }
  }).then( success => {
    // Message was sent encoded with Avro Schema
  }, error => {
    // Something wrong happen
  });
} , error => {
  // something wrong happen
});

Close

Ability to close the producer

WARNING : closing the producer will close kafka client, this is part of kafka-node baseProducer definition.

producer.close( closed => {
  // Connection is closed
});

addConsumer(\<TopicName>, [Options])

This package will auto decode the message before emitting on the message event, the message will be on a JSON format.

Options

KafkaAvro.init(Settings).then( kafka => {
  let consumer = kafka.addConsumer("my.cool.topic");

  consumer.on('message', message => {
   // we got a decoded message
  });
} , error => {
  // something wrong happen
});