mtth / avsc

Avro for JavaScript :zap:
MIT License
1.26k stars 144 forks source link

Interoperability with confluent kafka + schema registry + container header #22

Closed cromestant closed 8 years ago

cromestant commented 8 years ago

Hello, I'm working on using a confluent kafka deployment, and using avsc to be the publisher in many cases. However I am running into some problems when trying to use schema validation and the schema registry. I am wondering if I have not found the correct way to do this in avsc or if it is not yet compatible, but in essence, if you look at this You will see that along with the serialized data they include in the payload the schema, so that the client can then query the schema registry for the latest version of the schema to deserialize.

also looking at this it mentions that:

When Avro data is read, the schema used when writing it is always present. This permits each datum to be written with no per-value overheads, making serialization both fast and small. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

If I run using the java samples included in the distribution, I get no problems, however trying to serialize with AVSC I only get the serialized payload, and not the schema.(thus getting magic byte missing error)

Is this something avsc is compatible with? or , is there a way to have avsc include the schema in the header as described here and here

in advance, thank you for your reply

mtth commented 8 years ago

I'm not sure I follow what you're asking.

You can generate container files using a BlockEncoder. This will generate the appropriate magic bytes and header. I'm unsure how this will help for your use-case though, since including this in each message will significantly increase message size.

How are you talking to Kafka? Are you using the REST proxy?

cromestant commented 8 years ago

Thanks for your reply, For talking to kafka I'm using kafka-node , this is not using the rest proxy If i'm understanding correctly but direct TCP protocol. It does look like the BlockEncoder can help, I'll try it. Im still trying to grasp all of the intricacies of using Confluent with avro , as it seems that the schema registry is not as automated as first described. Right now I'm only validating a POC in which I want to write from avsc to the confluent distro and be able to decode from the sample java app. right now this is just giving the magic byte problem. Will test if this helps. The overhead is something I am concerned with, so I will try to omit it in production , so will have to configure other schema inferring mechanism.

mtth commented 8 years ago

I see. I'm not familiar with kafka-node but I'll try to take a look to understand what input the library expects.

Edit: Could you share the code you have so far?

cromestant commented 8 years ago

I might not be explaining myself. The error is on the receiving end. If you encode/ send/ decode with the Java apps included in confluent it works. Changing he producer with node app that uses avsc to serialize produces the error. Have not been able to test yet. Will report back asap

On Wednesday, December 16, 2015, Matthieu Monsch notifications@github.com wrote:

I see. I'm not familiar with kafka-node but I'll try to take a look to understand what input the library expects.

— Reply to this email directly or view it on GitHub https://github.com/mtth/avsc/issues/22#issuecomment-165160139.

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail Please think about the environment before you print this e-mail Por favor piense en el medio ambiente antes de imprimir este e-mail

gabrielnau commented 8 years ago

Confluent stack will add a header to a single Avro message:

< magic byte > < schema id (4 bytes) > < Avro blob >

You can get an idea of how deserialization works here.

As far as I know, Avsc is only about Avro and if you need to integrate with Confluent's stack, you need to handle yourself the specific header.

So I may be mistaken but I think Avro's "Object Container Files" is another topic.

cromestant commented 8 years ago

Awesome thanks!

On Monday, December 21, 2015, Gabriel Nau notifications@github.com wrote:

Confluent stack will add a header to a single Avro message:

< magic byte > < schema id (4 bytes) > < Avro blob >

You can get an idea of how deserialization works here https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L41 .

As far as I know, Avsc is only about Avro and if you need to integrate with Confluent's stack, you need to handle yourself the specific header.

So I may be mistaken but I think Avro's "Object Container Files" is another topic.

— Reply to this email directly or view it on GitHub https://github.com/mtth/avsc/issues/22#issuecomment-166350702.

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail Please think about the environment before you print this e-mail Por favor piense en el medio ambiente antes de imprimir este e-mail

mtth commented 8 years ago

Thanks @gabrielnau , this makes sense.

@cromestant - you should then be able to generate a valid message for example as follows:

/**
 * Encode an Avro value into a message, as expected by Confluent's Kafka Avro
 * deserializer.
 *
 * @param val {...} The Avro value to encode.
 * @param type {Type} Your value's Avro type.
 * @param schemaId {Integer} Your schema's ID (inside the registry).
 * @param length {Integer} Optional initial buffer length. Set it high enough
 * to avoid having to resize. Defaults to 1024.
 *
 */
function toMessageBuffer(val, type, schemaId, length) {
  var buf = new Buffer(length || 1024);
  buf[0] = 0; // Magic byte.
  buf.writeInt32BE(schemaId, 1);

  var pos = type.encode(val, buf, 5);
  if (pos < 0) {
    // The buffer was too short, we need to resize.
    return getMessageBuffer(type, val, schemaId, length - pos);
  }
  return buf.slice(0, pos);
}

Sample usage:

var type = avsc.parse('string');
var buf = toMessageBuffer('hello', type, 1); // Assuming 1 is your schema's ID.
cromestant commented 8 years ago

You guys are awesome. I'll test all of this in January when back from holidays

On Tuesday, December 22, 2015, Matthieu Monsch notifications@github.com wrote:

Thanks @gabrielnau https://github.com/gabrielnau , this makes sense.

@cromestant https://github.com/cromestant - you should then be able to generate a valid message for example as follows:

/* * Encode an Avro value into a message, as expected by Confluent's Kafka Avro * deserializer. * * @param val {...} The Avro value to encode. * @param type {Type} Your value's Avro type. * @param schemaId {Integer} Your schema's ID (inside the registry). * @param length {Integer} Optional initial buffer length. Set it high enough * to avoid having to resize. Defaults to 1024. * /function toMessageBuffer(val, type, schemaId, length) { var buf = new Buffer(length || 1024); buf[0] = 0; // Magic byte. buf.writeInt32BE(schemaId, 1);

var pos = type.encode(val, buf, 5); if (pos < 0) { // The buffer was too short, we need to resize. return getMessageBuffer(type, val, schemaId, length - pos); } else { return buf.slice(0, pos); } }

Sample usage:

var type = avsc.parse('string');var buf = toMessageBuffer('hello', type, 1); // Assuming 1 is your schema's ID.

— Reply to this email directly or view it on GitHub https://github.com/mtth/avsc/issues/22#issuecomment-166670111.

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail Please think about the environment before you print this e-mail Por favor piense en el medio ambiente antes de imprimir este e-mail

zcei commented 8 years ago

Found this while investigating the hassle to switch to Avro, and made me smile, very nice work :+1:

Our Java guys are already using Avro and they love it, especially together with the schema registry. That integration into avsc looks really easy!

Would be nice to here from @cromestant how it worked out, maybe you want to share some insights?

cromestant commented 8 years ago

Sorry. The truth is that we have not been able to pick it up yet. The beginning of the new year has left little time for this. Hope to pick it up again soon

On Monday, January 18, 2016, Stephan Schneider notifications@github.com wrote:

Found this while investigating the hassle to switch to Avro, and made me smile, very nice work [image: :+1:]

Our Java guys are already using Avro and they love it, especially together with the schema registry. That integration into avsc looks really easy!

Would be nice to here from @cromestant https://github.com/cromestant how it worked out, maybe you want to share some insights?

— Reply to this email directly or view it on GitHub https://github.com/mtth/avsc/issues/22#issuecomment-172510889.

MSc. Charles M. Romestant F.

Merci de penser à l'environnement avant d'imprimer cet e-mail Please think about the environment before you print this e-mail Por favor piense en el medio ambiente antes de imprimir este e-mail

cromestant commented 8 years ago

I've finally been able to test this, and it works like a charm. You have to use a keyed message, here is the relevant code using the methodsupplied by @mtth :

var type = avsc.parse('./schemas/charz.avsc')
var pet = {kind: 'CATbert', name: 'my other cat'}
producer.on('ready', function () {

        console.log("Ready to send!");
//        var buf = type.toBuffer(pet); // Serialized object.
        var buf = toMessageBuffer(pet, type, 1)

        var km = new KeyedMessage("event",buf);

       payloads = [
            { topic: 'pets', messages: km },
        ];
        console.log(payloads)
        producer.send(payloads, function (err, data) {
   //     res.send(data);
    });

});

marking this as closed as it is working.