kafkajs / confluent-schema-registry

is a library that makes it easier to interact with the Confluent schema registry
https://www.npmjs.com/package/@kafkajs/confluent-schema-registry
MIT License
157 stars 102 forks source link

Fix protobuf wire encoding/decoding #258

Open davidgrisham opened 10 months ago

davidgrisham commented 10 months ago

Fixes https://github.com/kafkajs/confluent-schema-registry/issues/152

Specifically, this updates the Protobuf schema registry implementation to match Confluent's wire format spec. In the current release this library can only be used when it is both the serializer and deserializer of Protobuf messages -- this branch brings the Protobuf implementation into alignment with the spec so it can be used with other schema registry clients. The primary update is that message indexes are properly encoded and decoded so the correct message type can be specified from the Protobuf schema's namespace.

I adapted much of this from the Confluent's implementation in Go, along with the variable integer encoding from Go's internal libraries.

This is currently working for several local test cases. We'll continue ensuring this works for us and I'll update this branch accordingly, any additional help with (or recommendations for) testing/implementation would be greatly appreciated.

This PR is dependent on a few updates in protobuf.js that I made here to ensure we have the correct message ordering for extracting indexes: https://github.com/protobufjs/protobuf.js/pull/1957

cc @Nevon

davidgrisham commented 9 months ago

Bumping this -- curious who all might make sense to ping aside from @Nevon, maybe @odselsevier ?

xav-ie commented 8 months ago

I am also very interested in getting this merged in. I read through the code and all looks good to me. I think the only think I would change is the comments could be JSDoc style so you get LSP hints:

/**
 * like this
 */

// instead of
// this
mkoiev commented 6 months ago

Hello, when are you going to merge pr?

hood commented 6 months ago

Any news? Is this repo maintained anymore? This PR is crucial to my usecase, as currently I can’t basically send protobuf anywhere using Kafka over Confluent since the encoding is proprietary and, it seems, not implemented in a way that ensure across compatibility across nodejs and other clients

maximkoev commented 6 months ago

Any news? Is this repo maintained anymore? This PR is crucial to my usecase, as currently I can’t basically send protobuf anywhere using Kafka over Confluent since the encoding is proprietary and, it seems, not implemented in a way that ensure across compatibility across nodejs and other clients

You can encode message using protobufjs if you have .proto file. That’s works in my case. Only data i need from registry is schema id. Just set it in the message header.

hood commented 6 months ago

Any news? Is this repo maintained anymore? This PR is crucial to my usecase, as currently I can’t basically send protobuf anywhere using Kafka over Confluent since the encoding is proprietary and, it seems, not implemented in a way that ensure across compatibility across nodejs and other clients

You can encode message using protobufjs if you have .proto file. That’s works in my case. Only data i need from registry is schema id. Just set it in the message header.

Hi, and thank you for your tip! Could you please give me a brief example, if possible, of what you mean by “message header”? Are we talking about the headers of the message emitted by the producers, or headers attached to the protobuf encoding? Because I’ve never head of the second, and have no idea of how to name the header field in the first example.

mkoiev commented 6 months ago

Any news? Is this repo maintained anymore? This PR is crucial to my usecase, as currently I can’t basically send protobuf anywhere using Kafka over Confluent since the encoding is proprietary and, it seems, not implemented in a way that ensure across compatibility across nodejs and other clients

You can encode message using protobufjs if you have .proto file. That’s works in my case. Only data i need from registry is schema id. Just set it in the message header.

Hi, and thank you for your tip! Could you please give me a brief example, if possible, of what you mean by “message header”? Are we talking about the headers of the message emitted by the producers, or headers attached to the protobuf encoding? Because I’ve never head of the second, and have no idea of how to name the header field in the first example.

I am talking about header when you produce message. If u need only consume you dont need that. Any way in case you are able download .proto file to your repository it could help you. try protobufjs library (just google it) here is simple example const root = loadSync('file_path'); const EventMetadata = root.lookupType('packagename.message'); const decodedMessage = EventMetadata.decode(encodedMessage)

encoded message is message.value from kafka consumer
ktatarchukkt commented 5 months ago

@mkoiev Hello Just wanted to clarify regarding you answer. If you put schemaId into header it will work only if you build encoding and decoding around that approach. Other implementations of wire protocol, Java, for example, and Confluent Registry Cloud itself won't be able to parse serialized data because it's missing message-indexes bytes

Am I right?

mkoiev commented 5 months ago

@ktatarchukkt Hello, You need to add schema id to the header only in case it is required by consumer. In my case it is required because we use confluent and without schema it consumer will not decode my message. Decoding and encoding using confluent kafkajs does not work for me, so I am using protobufjs to decode/encode message. In case you have issues with wire type while decoding as me. I have this issue because of using google.protobuf.timestamp. You have to patch protobufjs with fix. I pathed library and then deployed to the npm registry in the private artifactory. In case you have any other questions you can reach me out in the LinkedIn https://www.linkedin.com/in/maksym-koiev/