confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
66 stars 861 forks source link

Use of Confluent.SchemaRegistry.Serdes.ProtobufDeserializer with Kafka Connect converter io.confluent.connect.protobuf.ProtobufConverter throws Google.Protobuf.InvalidProtocolBufferException #1684

Closed CooperWolfe closed 3 years ago

CooperWolfe commented 3 years ago

Description

When I use a Kafka Connect configuration with the io.confluent.connect.protobuf.ProtobufConverter converter and use the Confluent.SchemaRegistry.Serdes.ProtobufDeserializer when constructing my consumer, I get a Google.Protobuf.InvalidProtocolBufferException.

How to reproduce

Kafka doesn't seem to be logging anything interesting other than seeing the failure by my service and booting it. Zookeeper has nothing of interest either. Additionally, Kafka Connect (or more specifically, Debezium) seems to be producing its messages without a hitch.

Versions

Confluent.Kafka nuget version: 1.7.0 Scala version: 2.12 Apache Kafka version: 2.8.0 OS: Windows 10 Home

Checklist

Please provide the following information:

Thank you to any and all who can help.

mhowlett commented 3 years ago

I suspect you might be hitting the framing incompatibility between .net and java serdes. The format was changed mid-development of this feature in schema registry, and the message didn't get through to the clients team - the non-java clients implemented the original proposed format. we have changed this in v1.8 (not released yet).

https://github.com/confluentinc/confluent-kafka-dotnet/pull/1628/files#diff-635a634d86028a5efbc4d6913293a861c4ff487f5983e10684f56228560a85d5R42

CooperWolfe commented 3 years ago

Thank you for the quick answer @mhowlett. I will try making and using a v1.8 locally tomorrow with latest master.

CooperWolfe commented 3 years ago

I cloned this repository and packed a local nuget package (bumped the version to 1.8.0, so no collisions with the nuget.org source). Even while using this, there seems to be no change in the error. Trying another converter. I would imagine the community has used this with Kafka Connect before. Perhaps someone has had success with another converter, or even the same one?

CooperWolfe commented 3 years ago

I did some intense testing. I got a console application up in my environment to produce an "identical" message. That is, a message created from the same proto file as was used to put the data into the database that Debezium was listening to. After some creative use of kafka-console-consumer, I was able to produce the following two byte streams:

The Kafka Connect byte stream: image

The Confluent.Kafka producer: image

For the record, this was the .proto:

...

import "google/protobuf/timestamp.proto";

message OutboxEntry {
    string payload = 1;
    string id = 2;
    string aggregateId = 3;
    string eventType = 4;
    google.protobuf.Timestamp createdAt = 5;
}

Without even breaking down the bytes, you can see that in the first byte stream, the value of createdAt was serialized as a string, not a google.protobuf.Timestamp. Seems likely to be the cause, but just to be sure: is there any reason this would NOT be the cause?

CooperWolfe commented 3 years ago

Looking in schema registry, it seems the schema produced by Kafka Connect was: image

While the schema produced by the Confluent.Kafka producer was: image

I believe I am certain this is my issue, but will adjust and confirm

CooperWolfe commented 3 years ago

It seems Debezium by default converts timestamp columns in Postgres using string's. This causes complications when it detects the google.protobuf.Timestamp field in the schema registry. The solution I went with was to grit my teeth and change my proto to use a string for the time being. Once I did that, I reverted to version 1.7.0 and all was still well. Learned a lot from this, thanks again @mhowlett for your answer.