riferrei / srclient

Golang Client for Schema Registry
Apache License 2.0
234 stars 70 forks source link

Protobuf Producer Example #17

Closed FredrikBakken closed 2 years ago

FredrikBakken commented 4 years ago

Hello,

I am currently facing some issues with trying to create a Kafka Producer for passing in Protobuf data. As a reference point, I tried to edit the Producer example application (since I got the AVRO example working without issues) as follows:

package main

import (
    "encoding/binary"
    "encoding/json"
    "fmt"
    "io/ioutil"

    "github.com/google/uuid"
    "github.com/riferrei/srclient"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

type ComplexType struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
}

func main() {

    topic := "myTopic"

    // 1) Create the producer as you would normally do using Confluent's Go client
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }
    defer p.Close()

    go func() {
        for event := range p.Events() {
            switch ev := event.(type) {
            case *kafka.Message:
                message := ev
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Error delivering the message '%s'\n", message.Key)
                } else {
                    fmt.Printf("Message '%s' delivered successfully!\n", message.Key)
                }
            }
        }
    }()

    // 2) Fetch the latest version of the schema, or create a new one if it is the first
    schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
    schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
    if schema == nil {
        schemaBytes, err := ioutil.ReadFile("example/complexType.proto")
        if err != nil {
            panic(fmt.Sprintf("File not found %s", err))
        }

        schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Protobuf, false)
        if err != nil {
            panic(fmt.Sprintf("Error creating the schema %s", err))
        }
    }
    schemaIDBytes := make([]byte, 4)
    binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

    // 3) Serialize the record using the schema provided by the client,
    // making sure to include the schema id as part of the record.
    newComplexType := ComplexType{ID: 1, Name: "Gopher"}
    value, _ := json.Marshal(newComplexType)
    native, _, _ := schema.Codec().NativeFromTextual(value)
    valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)

    var recordValue []byte
    recordValue = append(recordValue, byte(0))
    recordValue = append(recordValue, schemaIDBytes...)
    recordValue = append(recordValue, valueBytes...)

    key, _ := uuid.NewUUID()
    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic, Partition: kafka.PartitionAny},
        Key: []byte(key.String()), Value: recordValue}, nil)

    p.Flush(15 * 1000)

}

The protobuf schema looks like this and can be found in example/complexType.proto:

syntax = "proto3";

package example;

message ComplexType {
    int64 id = 1;
    string name = 2;
}

When I try to run this application, I get the following error message displayed:

>>> go run app_srclient.go
panic: Error creating the schema 422 : Either the input schema or one its references is invalid

goroutine 1 [running]:
main.main()
        <PATH>/app_srclient.go:55 +0x879
exit status 2

In advance, I'd like to thank anyone who could help out or provide examples to solve this issue!

shivakumarss commented 4 years ago

Hi @FredrikBakken

  1. You will have to compile your protobuf file using protobuf library. that will generate .go file. Reference link for generating .go file and compilation. https://developers.google.com/protocol-buffers/docs/gotutorial

  2. Using that generated .go file you have to create a object, and you cannot use newComplexType := ComplexType{ID: 1, Name: "Gopher"}

  3. Once the object is created use out, err := proto.Marshal(object) to get bytes of the object.

  4. Then follow the wire format to publish the bytes to confluent. https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format

Thanks Shiva Kumar SS

FredrikBakken commented 4 years ago

Hi Shiva Kumar SS (@shivakumarss),

Thank you so much for your explanations!

I have tried a little bit more on this to get it to work, but I still seem to be getting the same error message. Added details below to showcase what changes I have made:

  1. Compiled the .proto file to generate the .go file with: protoc --go_out=. example/*.proto. This created a new .go file within the ./example directory named complexType.pb.go.
  2. Created an object by replacing the line: newComplexType := ComplexType{ID: 1, Name: "Gopher"} with newComplexType := &example.ComplexType{Id: 1, Name: "Gopher"}.
  3. Got the object bytes by adding: value, err := proto.Marshal(newComplexType)
  4. Not 100% sure about how to publish the bytes according to the definition by Confluent. Is it according to the schema details or the data (key-value) itself? I have tried read both the complexType.proto and complexType.pb.go into the schemaBytes variable.

After trying these suggestions I've also tried to debug the code itself a little bit more and it seems to be related to the process of creating a new schema (step 2 in the example). I am able to find the files without issues, but the following line returns an error: schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Protobuf, false).

This is the latest version of the edited example code:

package main

import (
    "encoding/binary"
    "fmt"
    "io/ioutil"

    "google.golang.org/protobuf/proto"
    "<project-name>/example"

    "github.com/google/uuid"
    "github.com/riferrei/srclient"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    topic := "myTopicProto"

    // 1) Create the producer as you would normally do using Confluent's Go client
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }
    defer p.Close()

    go func() {
        for event := range p.Events() {
            switch ev := event.(type) {
            case *kafka.Message:
                message := ev
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Error delivering the message '%s'\n", message.Key)
                } else {
                    fmt.Printf("Message '%s' delivered successfully!\n", message.Key)
                }
            }
        }
    }()

    // 2) Fetch the latest version of the schema, or create a new one if it is the first
    schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
    schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
    if schema == nil {
        schemaBytes, err := ioutil.ReadFile("example/complexType.proto")
        if err != nil {
            panic(fmt.Sprintf("File not found %s", err))
        }

        schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), srclient.Protobuf, false)
        if err != nil {
            panic(fmt.Sprintf("Error creating the schema %s", err))
        }
    }
    schemaIDBytes := make([]byte, 4)
    binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

    // 3) Serialize the record using the schema provided by the client,
    // making sure to include the schema id as part of the record.
    newComplexType := &example.ComplexType{Id: 1, Name: "Gopher"}
    value, err := proto.Marshal(newComplexType)
    valueBytes, _ := schema.Codec().BinaryFromNative(nil, value)

    var recordValue []byte
    recordValue = append(recordValue, byte(0))
    recordValue = append(recordValue, schemaIDBytes...)
    recordValue = append(recordValue, valueBytes...)

    key, _ := uuid.NewUUID()
    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic, Partition: kafka.PartitionAny},
        Key: []byte(key.String()), Value: recordValue}, nil)

    p.Flush(15 * 1000)

}

Thank you for help on this!

Fredrik Bakken

shivakumarss commented 4 years ago

Hi @FredrikBakken

I had a look at this API, but unfortunately this is not supported as of today.

Screenshot 2020-09-15 at 10 40 37 PM

Reference https://github.com/riferrei/srclient/blob/master/mockSchemaRegistryClient.go#L54

Regards, Shiva Kumar SS

dineshgowda24 commented 3 years ago

@FredrikBakken were you able to get it working? If you have a working example will you be able to share?

Kronmannen commented 3 years ago

Had the same problem. The solution was to remove all new lines after reading the proto schema from file. Example from @FredrikBakken code.

if schema == nil {
        schemaBytes, err := ioutil.ReadFile("example/complexType.proto")
        if err != nil {
            panic(fmt.Sprintf("File not found %s", err))
        }
                formattedSchema := strings.ReplaceAll(string(schemaBytes), "\n", "") // added this line
        schema, err = schemaRegistryClient.CreateSchema(topic, formattedSchema, srclient.Protobuf, false)
        if err != nil {
            panic(fmt.Sprintf("Error creating the schema %s", err))
        }
    }

EDIT: Might have spoken too soon, I only get it to work when i declare the entire schema as a string in the code itself. Still cant get it to work when reading the string from a file. This works:

stringSchema := `syntax = "proto3";package main;message FavoriteEvent {  string id = 1;  string type = 2;  string url = 3;  string userId = 4;  string organizationId = 5;  string favoriteName = 6;}`
schema, err = schemaRegistryClient.CreateSchema(topic, formattedSchema, srclient.Protobuf, false)

Will update if I figure it out.

EDIT2: The new line thing turned out not to be important. When reading from a file to a string using ioutil.ReadFile there were some invisible character in the beginning of the string. When I skipped the first three characters it worked fine. Here is the code:

schemaBytes, _ := ioutil.ReadFile("file.proto")
schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes)[3:], srclient.Protobuf, false)
timme04 commented 3 years ago

@Kronmannen Sounds like you have a UTF-8 byte-order-marker (BOM) in your proto-file.

ideasculptor commented 3 years ago

Did you ever get this working? The code you provided definitely doesn't look correct to me. The bytes outputted by proto.Marshal() are the final binary form of the protobuf. There is no reason to call schema.Codec().BinaryFromNative(nil, bytes) when serializing a protobuf. That's an Avro-specific function. In fact, the codec variable is an instance of goavro.Codec.

However, according to the confluent documentation (https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format ), the wire format for serialized protobufs includes an extra field after the schema ID. Because more than one protobuf message type can be declared in a single schema file and because they can be nested, you have to provide an array of message indexes which specifies the actual message type you are looking for. The array is encoded as the length of the array, followed by the values within it. In the case where there is only one message declared in the schema, you provide an array with the index 0 - [0], written as 1,0. But if the top level message has 2 messages nested within it and we are serializing the 2nd of those, then you need to provide the array [0, 1], written as 2,0,1 - which is an array of length 2 representing the second embedded message (index=1) of the first message (index=0) type in the schema. To top it off, those all of those values are to be ZigZag encoded. (https://pkg.go.dev/google.golang.org/protobuf@v1.27.1/encoding/protowire#EncodeZigZag ). Additionally, there is a special case for instances where you are using the first message in the schema. Instead of an array of length 1 with value 0, you can just send the value 0. A ZigZag encoded 0 is 1 byte long, with the value 0.

I'm still trying to generate a working example, but I'll post one here if I ever get one functional.

ideasculptor commented 3 years ago

OK, here's a working protobuf example. Please note that my code doesn't register a schema if one isn't found. Because we use external references to other protobufs in our schemas, it is just easier to do registration via the confluence schema-registry maven plugin, with explicit references and guaranteed registration order, so this code assumes it will find a schema already registered.

I can confirm that it works because the protobuf is correctly rendered for the topic in the Confluent Control Center UI, with all the individual fields displayed in a type-specific manner.

package kafka

import (
    "encoding/binary"
    "fmt"

    "github.com/riferrei/srclient"
    "google.golang.org/protobuf/proto"
)

type SchemaRegistryClient interface {
    GetSchema(schemaID int) (*srclient.Schema, error)
    GetLatestSchema(subject string, isKey bool) (*srclient.Schema, error)
    CreateSchema(subject string, schema string, schemaType srclient.SchemaType, isKey bool, references ...srclient.Reference) (*srclient.Schema, error)
    CreateSchemaWithArbitrarySubject(subject string, schema string, schemaType srclient.SchemaType, references ...srclient.Reference) (*srclient.Schema, error)
    IsSchemaCompatible(subject, schema, version string, schemaType srclient.SchemaType, isKey bool) (bool, error)
}

type ProtobufSerializer struct {
    client        SchemaRegistryClient
    topic         string
    valueSchema   *srclient.Schema
    schemaIDBytes []byte
    msgIndexBytes []byte
}

func NewProtobufSerializer(schemaRegistryClient SchemaRegistryClient, topic string) *ProtobufSerializer {

    valueSchema, err := schemaRegistryClient.GetLatestSchema(topic, false)
    if err != nil {
        panic(fmt.Sprintf("Error fetching the value schema %s", err))
    }
    schemaIDBytes := make([]byte, 4)
    binary.BigEndian.PutUint32(schemaIDBytes, uint32(valueSchema.ID()))

    // 10 bytes is sufficient for 64 bits in zigzag encoding, but array indexes cannot possibly be 
        // 64 bits in a reasonable protobuf, so let's just make a buffer sufficient for a reasonable
        // array of 4 or 5 elements, each with relatively small index
    varBytes := make([]byte, 16)
    // array length 1
    length := binary.PutVarint(varBytes, 1)
    // index 0 element.  We could write array length 0 with no subsequent value, which is equivalent to writing 1, 0
    length += binary.PutVarint(varBytes[length:], 0)

    return &ProtobufSerializer{
        client:        schemaRegistryClient,
        topic:         topic,
        valueSchema:   valueSchema,
        schemaIDBytes: schemaIDBytes,
        msgIndexBytes: varBytes[:length],
    }
}

func (ps *ProtobufSerializer) Serialize(pb proto.Message) ([]byte, error) {
    bytes, err := proto.Marshal(pb)
    if err != nil {
        fmt.Printf("failed serialize: %v", err)
        return nil, err
    }

    var msgBytes []byte
    // schema serialization protocol version number
    msgBytes = append(msgBytes, byte(0))
    // schema id
    msgBytes = append(msgBytes, ps.schemaIDBytes...)
    // zig zag encoded array of message indexes preceded by length of array
    msgBytes = append(msgBytes, ps.msgIndexBytes...)

    fmt.Printf("msgBytes is of length %d before proto\n", len(msgBytes))
    msgBytes = append(msgBytes, bytes...)

    return msgBytes, nil
}

func (ps *ProtobufSerializer) GetTopic() *string {
    return &ps.topic
}

My Deserializer was just kludged together and doesn't yet match this. I'll come back and add it once it is complete, but it should be reasonably obvious. Just use proto.Unmarshall(). Note that I have not yet figured out a mechanism to ensure that the object being serialized is actually compatible with the schema fetched for the subject. The generated protobuf objects don't have a schema - the generated code IS the schema. So it is something of an article of faith that everything is compatible. Stuff will break if it isn't.

ideasculptor commented 3 years ago

Here's a working Deserializer, too:

package kafka

import (
    "encoding/binary"
    "fmt"

    "github.com/riferrei/srclient"
    "google.golang.org/protobuf/proto"
)

type ProtobufDeserializer struct {
    client      SchemaRegistryClient
    topic       string
    valueSchema *srclient.Schema
}

func NewProtobufDeserializer(schemaRegistryClient SchemaRegistryClient, topic string) *ProtobufDeserializer {

    valueSchema, err := schemaRegistryClient.GetLatestSchema(topic, false)
    if err != nil {
        panic(fmt.Sprintf("Error fetching the value schema %s", err))
    }

    return &ProtobufDeserializer{
        client:      schemaRegistryClient,
        topic:       topic,
        valueSchema: valueSchema,
    }
}

func (ps *ProtobufDeserializer) Deserialize(bytes []byte, pb proto.Message) error {
    // decode the number of elements in the array of message indexes
    arrayLen, bytesRead := binary.Varint(bytes[5:])
    if bytesRead <= 0 {
        err := fmt.Errorf("Unable to decode message index array")
        return err
    }
    totalBytesRead := bytesRead
    msgIndexArray := make([]int64, arrayLen)
    // iterate arrayLen times, decoding another varint
    for i := int64(0); i < arrayLen; i++ {
        idx, bytesRead := binary.Varint(bytes[5+totalBytesRead:])
        if bytesRead <= 0 {
            err := fmt.Errorf("Unable to decode value in message index array")
            return err
        }
        totalBytesRead += bytesRead
        msgIndexArray[i] = idx
    }
    // we have msgIndexArray populated.  If we had a parsed schema, we could
    // look up the actual message type with that.  Or use it as a key into a
    // table of closures, each of which returns proto.Message of the correct
    // type.  Then unmarshall into that and return it. User can cast proto.Message
    // to the actual type safely. But for now, we simply deserialize into the
    // proto that was passed in.
    err := proto.Unmarshal(bytes[5+totalBytesRead:], pb)
    if err != nil {
        fmt.Printf("failed deserialize: %v", err)
        return err
    }
    return nil
}

func (ps *ProtobufDeserializer) GetTopic() *string {
    return &ps.topic
}
jacobpeebles commented 2 years ago

Hey guys. did anyone try the code above? the writer seems to be partially working(but confluent UI does not render all the values), but the consumer code, unfortunately, doesn't seem to work. Maybe someone has a more complete version?

ideasculptor commented 2 years ago

I'm just stepping away from my desk, but I use that code (or whatever it has evolved into) quite a bit. When I get back this afternoon, I'll make sure that whatever the most recent changes are are reflected above. I copied that stuff in here very eary in the process so it's entirely possible that there are bug fixes that haven't been incorporated.

Note that if you run the most recent confluent platform in docker containers, the confluent UI should display fully parsed messages correctly. Confluent cloud control center has not been updated to render protobufs correctly, even when they are encoded correctly, though streams, ksqldb, and connectors will correctly parse them. So you cannot count on the confluent cloud control center to show you that things are being encoded correctly. You've got to send to a local instance of confluent 6.2 or better - but I found that their docker-compose file works great for doing that.

jacobpeebles commented 2 years ago

@ideasculptor thank you for that, I'm tagging in @renevall who is equally interested in this (we work together).

ideasculptor commented 2 years ago

Sorry for the awkwardness of the exchange. The company I'm working for wants to open source this work, but they don't really have a process for doing so, and it hasn't really been my highest priority to work my way through that. I need to get our code into a github repo and then actually intend to write it up on medium, but for now, you'll have to live with copy and paste here. Sorry. I actually have a whole client wrapper around the official kafka client which does schema-based serialization and deserialization, with more or less identical functionality in a typescript client, too, since we have both typescript and golang services reading and writing to kafka and neither language has working support for schema registry and protobufs.

jacobpeebles commented 2 years ago

@ideasculptor no need to apologize! Responding is already enough, thanks so much. Any guidance you can provide will help us a great deal. @renevall

ideasculptor commented 2 years ago

Here's the serializer

import (
    "encoding/binary"
    "encoding/json"
    "fmt"

    "github.com/riferrei/srclient"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/reflect/protoreflect"
)

// Simplified interface for srclient.SchemaRegistryClient
type SchemaRegistryClient interface {
    GetSchema(schemaID int) (*srclient.Schema, error)
    GetLatestSchema(subject string) (*srclient.Schema, error)
    SetCredentials(username string, password string)
}

// SchemaResolver is an interface that can resolve a schema registry schema from
// destination topic and the entity being serialized. It is analogous to the
// SubjectNameStrategy in confluent clients, but also performs the registry schema
// lookup.
type SchemaResolver interface {
    ResolveSchema(topic string, msg proto.Message) (*srclient.Schema, error)
}

// SerializationType is a type alias for representing Key and Value serialization
// types
type SerializationType int

const (
    KeySerialization SerializationType = iota
    ValueSerialization
)

// TopicNameSchemaResolver is an instance of SchemaResolver which uses the topic
// name as the subject when looking up schema via schema registry
type TopicNameSchemaResolver struct {
    serializationType SerializationType
    client            SchemaRegistryClient
}

// NewTopicNameSchemaResolver is a constructor for TopicNameSchemaResolver.
// Receives a SchemaRegistryClient, which should have caching enabled as schema
// is resolved for every serialization performed by a serializer, as well as a
// SerializationType, which specifies whether to resolve a key or value schema
// for the topic
func NewTopicNameSchemaResolver(
    client SchemaRegistryClient,
    serializationType SerializationType,
) *TopicNameSchemaResolver {
    return &TopicNameSchemaResolver{
        serializationType: serializationType,
        client:            client,
    }
}

// ResolveSchema using the TopicNameStrategy, which uses the topic name as the
// subject. Ensure the schema registry client that was pass to the constructor
// has caching enabled or this will be slow to execute
func (ls *TopicNameSchemaResolver) ResolveSchema(
    topic string,
    msg proto.Message,
) (*srclient.Schema, error) {
    if msg == nil {
        return nil, fmt.Errorf(
            "cannot resolve schema for nil message and topic: %s",
            topic,
        )
    }
    return ls.client.GetLatestSchema(ls.constructSubject(topic))
}

func (ls *TopicNameSchemaResolver) constructSubject(topic string) string {
    if ls.serializationType == KeySerialization {
        return topic + "-key"
    }
    return topic + "-value"
}

// SerializationFunc is a type that describes the function that is ultimately
// used to serialize a protobuf.
type SerializationFunc = func([]byte, proto.Message) ([]byte, error)

// InitializationFunc is a type that describes a function to be used to initialize
// a messsage prior to serialization.
type InitializationFunc = func(proto.Message)

// ProtobufSerializer is an instance of Serializer which serializes protobufs
// according to the confluent schema registry line protocol
type ProtobufSerializer struct {
    schemaResolver SchemaResolver
    headerCache    map[int]map[string][]byte
    marshal        SerializationFunc
    initialize     InitializationFunc
}

// VTMarshal is an interface that will be satisfied by any protobuf that has had
// the protoc-gen-go-vtproto plugin applied to it with the marshal and size
// options. If a proto satisfies this interface, the Marshal function will apply
// the much more efficient MarshalToVT serialization
type VTMarshal interface {
    SizeVT() int
    MarshalToVT(data []byte) (int, error)
}

// Marshal is a wrapper around proto which will use MarshalToVT if that
// method is available in the proto, which serializes much more rapidly
// than the reflection-based proto.Marshal
func Marshal(header []byte, msg proto.Message) ([]byte, error) {
    switch m := msg.(type) {
    case VTMarshal:
        // Whenever available, use VTMarshal for MUCH faster serialization
        size := len(header) + m.SizeVT()
        buffer := make([]byte, 0, size)
        buffer = append(buffer, header...)
        bytesWritten, err := m.MarshalToVT(buffer[len(header):])
        return buffer[:len(header)+bytesWritten], err
    default:
        bytes, err := proto.Marshal(msg)
        header = append(header, bytes...)
        return header, err
    }
}

// NewProtobufSerializer is a constructor function for ProtobufSerializer.
// Receives a SchemaResolver as parameter.
func NewProtobufSerializer(
    schemaLookupStrategy SchemaResolver,
    initialize InitializationFunc,
    serializationFunc ...SerializationFunc,
) *ProtobufSerializer {
    // marshall via Marshal by default
    marshal := Marshal
    if len(serializationFunc) > 0 {
        marshal = serializationFunc[0]
    }

    return &ProtobufSerializer{
        schemaResolver: schemaLookupStrategy,
        headerCache:    make(map[int]map[string][]byte),
        marshal:        marshal,
        initialize:     initialize,
    }
}

// Serialize encodes a protobuf for the specified topic.
func (ps *ProtobufSerializer) Serialize(
    topic string,
    thing interface{},
) ([]byte, error) {
    if thing == nil {
        // It is legitimate to serialize nil to nil
        return nil, nil
    }

    // ensure thing is a protobuf
    var msg proto.Message = nil
    switch t := thing.(type) {
    case proto.Message:
        msg = t
    default:
        return nil, fmt.Errorf(
            "serialization target must be a protobuf. Got '%v'",
            t,
        )
    }

    schema, err := ps.schemaResolver.ResolveSchema(topic, msg)
    if err != nil {
        return nil, err
    }

        // initialize(msg) is a user-provided function which can initialize fields in the empty protobuf.
        // A timestamp or a source address or anything else your particular use case may require.
        // We have a Metadata message type in all our protos which is always in a top-level field
        // called 'metadata' which we populate reflectively via initialize().  It contains information 
        // about the source process. We could put that info in headers instead of a field of the
        // message. 
    if ps.initialize != nil {
        ps.initialize(msg)
    }

    // avoid recomputing message indexes and encoding header on every request
    buf := ps.lookupHeader(schema.ID(), msg)
    if buf == nil {
        msgIndexes := computeMessageIndexes(msg.ProtoReflect().Descriptor(), 0)
        buf = encodePayloadHeader(schema.ID(), msgIndexes)

        ps.storeHeader(schema.ID(), msg, buf)
    }

    bytes, err := ps.marshal(buf, msg)
    if err != nil {
        return nil, err
    }
    return bytes, nil
}

func (ps *ProtobufSerializer) lookupHeader(id int, msg proto.Message) []byte {
    messageTypes := ps.headerCache[id]
    if messageTypes == nil {
        return nil
    }
    return messageTypes[string(msg.ProtoReflect().Descriptor().FullName())]
}

func (ps *ProtobufSerializer) storeHeader(
    id int,
    msg proto.Message,
    header []byte,
) {
    // populate the cache of pre-encoded headers
    messageTypes := ps.headerCache[id]
    if messageTypes == nil {
        messageTypes = make(map[string][]byte)
        ps.headerCache[id] = messageTypes
    }
    messageTypes[string(msg.ProtoReflect().Descriptor().FullName())] = header
}

// protobuf line protocol for kafka has protocol version number (0 as byte),
// then schema id (uint32), then an array of message indexes that eventually
// identifies exactly which message within a schema file the proto in question
// actually is. If proto is 3rd message nested within message that is 4th
// message within first message in schema file, array would be [0, 3, 2].
// First message in schema is [0]
func computeMessageIndexes(
    descriptor protoreflect.Descriptor,
    count int,
) []int {
    index := descriptor.Index()
    switch v := descriptor.Parent().(type) {
    case protoreflect.FileDescriptor:
        // parent is FileDescriptor, we reached the top of the stack, so we are
        // done. Allocate an array large enough to hold count+1 entries and
        // populate first value with index
        msgIndexes := make([]int, count+1)
        msgIndexes[0] = index
        return msgIndexes[0:1]
    default:
        // parent is another MessageDescriptor.  We were nested so get that
        // descriptor's indexes and append the index of this one
        msgIndexes := computeMessageIndexes(v, count+1)
        return append(msgIndexes, index)
    }
}

// encodePayloadHeader writes the line protocol header for protobufs, which
// consists of the protocol version (0 as byte), the schema id (uint32),
// followed by the length of the message index array (variable, zigzag
// encoded) and then each element of that array (variable, zigzag encoded).
func encodePayloadHeader(schemaId int, msgIndexes []int) []byte {
    // allocate buffer with 5 bytes for version and schemaId, and sufficient
    // space for msgIndexes in zigzag encoding plus length of array
    buf := make([]byte, 5+((1+len(msgIndexes))*binary.MaxVarintLen64))

    // write version of protobuf line protocol
    buf[0] = byte(0)

    // write schema id
    binary.BigEndian.PutUint32(buf[1:5], uint32(schemaId))
    length := 5

    // write length of indexes array
    length += binary.PutVarint(buf[length:], int64(len(msgIndexes)))

    // Now write each array value
    for _, element := range msgIndexes {
        length += binary.PutVarint(buf[length:], int64(element))
    }

    return buf[0:length]
}

type StringSerializer struct {
}

func (s *StringSerializer) Serialize(topic string, thing interface{}) ([]byte, error) {
    return []byte(thing.(string)), nil
}

type JsonSerializer struct {
}

func (s *JsonSerializer) Serialize(topic string, thing interface{}) ([]byte, error) {
    return json.Marshal(thing)
}
ideasculptor commented 2 years ago

and here's the deserializer:

import (
    "encoding/binary"
    "fmt"

    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/reflect/protoreflect"
)

// ProtobufResolver is an interface which can resolve a protobuf
// MessageDescriptor from topic name and the info contained in the
// message header and instantiate an instance of the message described
// by the MessageDescriptor
type ProtobufResolver interface {
    ResolveProtobuf(topic *string, schemaId uint32, msgIndexes []int64) (proto.Message, error)
}

// ProtobufRegistry is the minimum interface of protoregistry.Types registry
// needed to resolve MessageType from topic name (plus a registration function,
// for convenience)
type ProtobufRegistry interface {
    RangeMessages(f func(protoreflect.MessageType) bool)
    FindMessageByName(message protoreflect.FullName) (protoreflect.MessageType, error)
}

// DeserializationType is a type alias for representing Key and Value
// deserialization types
type DeserializationType int

const (
    KeyDeserialization DeserializationType = iota
    ValueDeserialization
)

// ProtoregistryTopicNameProtobufResolver is a concrete implementation of
// ProtobufResolver which uses topic name in combination with protoregistry
// to resolve a protoreflect.MessageType that can then be used to instantiate
// a new instance of that type
type ProtoregistryTopicNameProtobufResolver struct {
    deserializationType DeserializationType
    registry            ProtobufRegistry
}

// NewProtoregistryTopicNameProtobufResolver is a constructor
func NewProtoregistryTopicNameProtobufResolver(
    registry ProtobufRegistry,
    deserializationType DeserializationType,
) *ProtoregistryTopicNameProtobufResolver {
    return &ProtoregistryTopicNameProtobufResolver{
        deserializationType: deserializationType,
        registry:            registry,
    }
}

// ResolveProtobuf uses topic name in combination with protorregistry to find
// protoreflect.MessageType that matches.  It then instantiates a new instance ot
// that type and returns it.
func (reg *ProtoregistryTopicNameProtobufResolver) ResolveProtobuf(
    topic *string,
    schemaId uint32,
    msgIndexes []int64,
) (proto.Message, error) {
    var mt protoreflect.MessageType
    reg.registry.RangeMessages(func(messageType protoreflect.MessageType) bool {
        if string(messageType.Descriptor().Name()) == *topic {
            mt = messageType
            return false
        }
        return true
    })
    if mt != nil {
        pb := mt.New()
        return pb.Interface(), nil
    }
    return nil, fmt.Errorf("Unable to find MessageType for topic: %s", *topic)
}

// DeserializationFunc is a type that describes the function that is ultimately used to
// deserialize a protobuf.
type DeserializationFunc = func([]byte, proto.Message) error

// ProtobufDeserializer hydrates a []byte into a Protobuf which is resolved via
// a ProtobufResolver
type ProtobufDeserializer struct {
    protobufResolver ProtobufResolver
    unmarshal        DeserializationFunc
}

// VTUnmarshal is an inerface satisfied by any protobuf that has been built with
// the protoc-gen-go-vtproto tool to generate an efficient unmarshal method
type VTUnmarshal interface {
    UnmarshalVT(data []byte) error
}

// Unmarshal is a wrapper around proto.Unmarshal which will use UnmarshalVT when
// deserializing any proto that has been modified by protoc-gen-go-vtproto with
// the unmarshal option
func Unmarshal(bytes []byte, msg proto.Message) error {
    switch m := msg.(type) {
    case VTUnmarshal:
        return m.UnmarshalVT(bytes)
    default:
        return proto.Unmarshal(bytes, msg)
    }
}

// NewProtobufDeserializer is a constructor that takes a SchemaRegistryClient
// and a ProtobufResolver, which are used to determine schema and resolve an
// empty protobuf that data can be unmarshalled into.
func NewProtobufDeserializer(
    protobufResolver ProtobufResolver,
    deserializationFunc ...DeserializationFunc,
) *ProtobufDeserializer {
    // marshall via Marshal by default
    unmarshal := Unmarshal
    if len(deserializationFunc) > 0 {
        unmarshal = deserializationFunc[0]
    }

    return &ProtobufDeserializer{
        protobufResolver: protobufResolver,
        unmarshal:        unmarshal,
    }
}

// Deserialize hydrates an []byte into a protobuf instance which is resolved
// from the topic name and schemaId by the ProtobufResolver
func (ps *ProtobufDeserializer) Deserialize(
    topic *string,
    bytes []byte,
) (interface{}, error) {
    if bytes == nil {
        return nil, nil
    }

    bytesRead, schemaId, msgIndexes, err := decodeHeader(topic, bytes)

    // resolve an empty instance of correct protobuf
    pb, err := ps.protobufResolver.ResolveProtobuf(topic, schemaId, msgIndexes)
    if err != nil {
        return nil, err
    }

    // unmarshal into the empty protobuf after the header in bytes
    err = ps.unmarshal(bytes[bytesRead:], pb)
    if err != nil {
        return nil, err
    }
    return pb, nil
}

func decodeHeader(
    topic *string,
    bytes []byte,
) (totalBytesRead int, schemaId uint32, msgIndexes []int64, err error) {

    if bytes[0] != byte(0) {
        err = fmt.Errorf("invalid protobuf wire protocol.  Received version: %v", bytes[0])
        return
    }
    // we should actually validate the schemaId against the topic in some way, but note that it
    // only needs to be compatible with the latest schema, not equal to it.
    schemaId = binary.BigEndian.Uint32(bytes[1:5])

    // decode the number of elements in the array of message indexes
    arrayLen, bytesRead := binary.Varint(bytes[5:])
    if bytesRead <= 0 {
        err = fmt.Errorf("Unable to decode message index array")
        return
    }
    totalBytesRead = 5 + bytesRead
    msgIndexes = make([]int64, arrayLen)
    // iterate arrayLen times, decoding another varint
    for i := int64(0); i < arrayLen; i++ {
        idx, bytesRead := binary.Varint(bytes[totalBytesRead:])
        if bytesRead <= 0 {
            err = fmt.Errorf("Unable to decode value in message index array")
            return
        }
        totalBytesRead += bytesRead
        msgIndexes[i] = idx
    }
    return
}

type StringDeserializer struct{
}

func (s *StringDeserializer) Deserialize(topic *string, bytes []byte) (interface{}, error) {
    return string(bytes), nil
}

type JsonDeserializer struct{
    // because json.Unmarshal needs an actual instance to deserialize into,
    // delegate the work to the provided unmarshal function
    unmarshal func(topic *string, data []byte) (interface{}, error)
}

func NewJsonDeserializer(unmarshal func(topic *string, data []byte) (interface{}, error)) *JsonDeserializer {
    return &JsonDeserializer{
        unmarshal: unmarshal,
    }
}

func (s *JsonDeserializer) Deserialize(topic *string, bytes []byte) (interface{}, error) {
    return s.unmarshal(topic, bytes)
}
ideasculptor commented 2 years ago

Note that the code here does not do any parsing of protobuf schemas at runtime - Instead, it passes the wire protocol header info to a delegate and the delegate returns an empty instance of the correct type which the unmarshal function will then populate. One could write a delegate that actually parses raw schema syntax fetched from the schema registry, if necessary, but our use cases are simpler than that and it is easy enough to look up the message type in the schema registry by name, greatly simplifying the code. My assumption is that most implementations will have a relatively simple mapping from topic to message types, whether the topic name actually matches the message type or not, and that the topic name, schema id, and message index array will be sufficient to do a direct lookup of some kind, so parsing schemas should be unnecessary for the vast majority of users. The provided resolver just assumes the topic name matches the message name for simplicity. It should be a simple matter to implement a resolver that functions according to your specific needs.

Also, the code supports (but does not require) the VTMarshal protoc plugin for high performance serialization and deserialization.

ideasculptor commented 2 years ago

One last thing - the wire protocol has 3 fields - the version (1 byte), the schemaId (4 bytes, network order), and then the array of indexes to a particular message type, where the array is written as a length n and then n integers, zigzag encoded. So the first message in a schema would always be sent as 1,0 (length = 1, value = 0, which would be zigzag encoded as 2,0 if you are looking at raw bytes in a buffer), but the wire protocol allows that to be shortened to sending just a 0 since a naive parser would parse that correctly anyway (read a 0 length and then don't read any more because the length is 0). This implementation NEVER uses the shortcut, always writing at least 2 bytes. It just seems like an unnecessary complication that prevents the parsing of a single byte but could lead to bugs or confusion.

Additionally, the python implementation of the registry client is broken and doesn't handle message indexes correctly, anyway (it doesn't use zigzag encoding correctly, if memory serves), so if you are using the python client anywhere, you can only use the python client because it isn't compatible with the java client OR this client. This code is confirmed to be compatible with the java client. For some crazy reason, confluent doesn't think that fixing the python client warrants bumping the version of the wire protocol up to 1 from 0, so when they do fix the python client, stuff will just break depending on if you have messages serialized with the broken client, and there will be no way to detect the issue, since messages encoded with the fixed client will have the same wire protocol version number as messages encoded with the broken client. Basically, if you are using python, caveat emptor. If you need this code to work with the broken python client, your only choice is to always use the first message definition in the schema file, and then always use the shortcut of sending just a 0, instead of 1,0 as the message index array, because 0 is the same whether zigzag encoded or not, so the python client will parse it correctly.

bigkraig commented 2 years ago

@ideasculptor this was very helpful & thanks a lot.

For others who are trying to leverage proto schemas, this ProtobufResolver implementation allows me (so far in my testing) to parse the returned Proto from the schema registry and match it against the generated .proto loaded into my consumer.

// SchemaRegistryProtobufResolver
type SchemaRegistryProtobufResolver struct {
    schemaRegistry      SchemaRegistryClient
    protobufRegistry    ProtobufRegistry
    deserializationType DeserializationType
}

// NewSchemaRegistryProtobufResolver
func NewSchemaRegistryProtobufResolver(
    schemaRegistry SchemaRegistryClient,
    protobufRegistry ProtobufRegistry,
    deserializationType DeserializationType,
) *SchemaRegistryProtobufResolver {
    return &SchemaRegistryProtobufResolver{
        schemaRegistry:      schemaRegistry,
        protobufRegistry:    protobufRegistry,
        deserializationType: deserializationType,
    }
}

// This should probably exist in srclient
func (reg *SchemaRegistryProtobufResolver) parseSchema(schemaId int) (*desc.FileDescriptor, error) {
    parser := protoparse.Parser{
        Accessor: func(filename string) (io.ReadCloser, error) {
            var schema *srclient.Schema
            var err error

            // filename is a schema id, fetch it directly
            if schemaId, err = strconv.Atoi(filename); err == nil {
                schema, err = reg.schemaRegistry.GetSchema(schemaId)
            } else {
                // otherwise its likely an import and we look it up by its filename
                schema, err = reg.schemaRegistry.GetLatestSchema(filename)
            }

            if err != nil {
                return nil, err
            }
            if *(schema.SchemaType()) != srclient.Protobuf {
                return nil, fmt.Errorf("schema %v is not a Protobuf schema", schemaId)
            }
            return io.NopCloser(strings.NewReader(schema.Schema())), nil
        },
    }

    fileDescriptors, err := parser.ParseFiles(strconv.Itoa(schemaId))
    if err != nil {
        return nil, err
    }

    if len(fileDescriptors) != 1 {
        return nil, fmt.Errorf("unexpected schema from schema registry")
    }
    return fileDescriptors[0], nil
}

// ResolveProtobuf
func (reg *SchemaRegistryProtobufResolver) ResolveProtobuf(
    schemaId int,
    msgIndexes []int,
) (proto.Message, error) {

    fileDescriptor, err := reg.parseSchema(schemaId)
    if err != nil {
        return nil, err
    }

    msg := resolveDescriptorByIndexes(msgIndexes, fileDescriptor)

    var mt protoreflect.MessageType
    reg.protobufRegistry.RangeMessages(func(messageType protoreflect.MessageType) bool {
        if string(messageType.Descriptor().Name()) == msg.GetName() {
            mt = messageType
            return false
        }
        return true
    })
    if mt != nil {
        pb := mt.New()
        return pb.Interface(), nil
    }
    return nil, fmt.Errorf("unable to find MessageType for messageIndex %v inside schema %v", msgIndexes, schemaId)
}

func resolveDescriptorByIndexes(msgIndexes []int, descriptor desc.Descriptor) desc.Descriptor {
    if len(msgIndexes) == 0 {
        return descriptor
    }

    index := msgIndexes[0]
    msgIndexes = msgIndexes[1:]

    switch v := descriptor.(type) {
    case *desc.FileDescriptor:
        return resolveDescriptorByIndexes(msgIndexes, v.GetMessageTypes()[index])
    case *desc.MessageDescriptor:
        if len(msgIndexes) > 0 {
            return resolveDescriptorByIndexes(msgIndexes, v.GetNestedMessageTypes()[index])
        } else {
            return v.GetNestedMessageTypes()[index]
        }
    default:
        fmt.Printf("no match: %v\n", v)
        return nil
    }
}

example usage

    schemaRegistryClient := srclient.CreateSchemaRegistryClient(lib.SchemaRegistryUrl)
    schemaRegistryClient.SetCredentials(lib.SchemaRegistryUsername, lib.SchemaRegistryPassword)
    protobufResolver := lib.NewSchemaRegistryProtobufResolver(schemaRegistryClient, protoregistry.GlobalTypes, lib.ValueDeserialization)
    deserializer := lib.NewProtobufDeserializer(protobufResolver)

    for {
        msg, err := c.ReadMessage(60 * time.Second)
        if err == nil {
            value, err := deserializer.Deserialize(msg.Value)
            if err != nil {
                sugar.Fatal(err)
            }

            switch v := value.(type) {
            case *schema.SampleRecord:
                sugar.Infof("Here is the sample record: (%s), headers (%v)", v.String(), msg.Headers)
            case *schema.OtherRecord_NestedRecord:
                sugar.Infof("Here is the nested record: (%s), headers (%v)", v.String(), msg.Headers)
            case *schema.OtherRecord:
                sugar.Infof("Here is the other record: (%s), headers (%v)", v.String(), msg.Headers)
            default:
                sugar.Infof("unrecognized message type: %T", v)
            }
        } else {
            sugar.Infof("Error consuming the message: %v (%v)", err, msg)
        }
    }
syntax = "proto3";
package com.mycorp.mynamespace;

message SampleRecord {
  int32 my_field1 = 1;
  double my_field2 = 2;
  string my_field3 = 3;
  string my_field4 = 4;
}
message OtherRecord {
  string field = 1;

  message NestedRecord {
    string nestedfield = 1;
  }
}
ericliu1990 commented 2 years ago

Hello @bigkraig thank u for sharing the sample code, im trying the code but seems the type of protobufResolver does not match the param type of NewProtobufDeserializer, could u pls share the full solution ? thank u~

protobufResolver := NewSchemaRegistryProtobufResolver(schemaRegistryClient, protoregistry.GlobalTypes, ValueDeserialization)
    deserializer := NewProtobufDeserializer(protobufResolver)
rverma-dev commented 2 years ago

@ericliu1990 I am able to use this with the above code snippet and slight change, have a look here https://github.com/rverma-nsl/kafka-schemaregistry-protobuf. Although onething which was confusing to me is that the local protobuf registry is creating registry entry by name whereas confluent schema registry is by schemaID. I need to create a static map to hold this transformation. However I think if you create your protobuf files as schemaID.proto that won't be required.

ericliu1990 commented 2 years ago

thanks @rverma-nsl i have tried your solution, it looks good, thank u but this error was encountered when i consume the event from kafka and deserialize it invalid protobuf wire protocol. Received version: 10 any idea ?

bigkraig commented 2 years ago

thanks @rverma-nsl i have tried your solution, it looks good, thank u but this error was encountered when i consume the event from kafka and deserialize it invalid protobuf wire protocol. Received version: 10 any idea ?

Try adding the change in green. I had noticed a bug in decodeHeader when dealing with messages encoded with the Python library, if this is that same issue this is how I had fixed it.

image
ericliu1990 commented 2 years ago

Hello @bigkraig thanks for the reply, I think my code returned at the very beginning of the decode header function :(

image
bigkraig commented 2 years ago

short on time this morning folks but I put a PR up with an example of the producer https://github.com/riferrei/srclient/pull/85

bigkraig commented 2 years ago

Hello @bigkraig thanks for the reply, I think my code returned at the very beginning of the decode header function :(

image

that is probably a message that wasn't properly serialized

AtakanColak commented 2 years ago

Closing as per comment on https://github.com/riferrei/srclient/issues/16#issuecomment-1218278323