confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Question - Using Confluent Schema Registry and AVRO: How to Serialize and Send a Message with Unknown Structure? #1269

Open msempere opened 1 month ago

msempere commented 1 month ago

Description

I'm currently working on a project where I need to serialize and send messages to Confluent Kafka using the Confluent Schema Registry and AVRO. The challenge I'm facing is that the messages have an unknown structure, and I only have the schema ID and topic.

I've tried using interface{} and map[string]interface{} but ser.Serialize(topic, &record) fails and I'm unable to serialize the message. One of the errors I'm getting is unknown type interface.

Detailed Scenario

I have a log file with JSON records that contain the schema ID, topic, and a JSON message to be sent to Confluent Kafka. The JSON message is arbitrary, and I don't know its structure in advance. Here's the structure when reading the records:

type Record struct {
    Topic    string `json:"topic"`
    SchemaID int    `json:"id"`
    Msg      map[string]interface{} `json:"msg"`
}

The Issue

I'm unable to serialize the Msg without knowing its structure. Here is the code I'm using:

configuration := avrov2.NewSerializerConfig()
configuration.UseSchemaID = record.SchemaID
configuration.AutoRegisterSchemas = false

ser, err := avrov2.NewSerializer(schemaRegistryClient, serde.ValueSerde, configuration)
serializedMsg, err := ser.Serialize(record.Topic, &record.Msg)

This throws an unknown type interface error when using Msg map[string]interface{} or interface{}.

Alternative Attempt

As an alternative, I tried using github.com/hamba/avro/v2 to serialize the Msg with the schema retrieved from schemaRegistryClient.GetBySubjectAndID, but it didn't work either:

metadata, err = schemaRegistryClient.GetBySubjectAndID(schemaValue, record.SchemaID)
avroSchema, err = avro.Parse(metadata.Schema)
msg, err = avro.Marshal(avroSchema, record.Msg)

This throws a float64 is unsupported for Avro int error for Msg with just an int value.

Question

How can I accomplish serializing an arbitrary JSON message using AVRO and the schema registry? Any help or guidance would be greatly appreciated.

Thanks!


How to reproduce

See code above. I'm happy to share the full code if needed.

Checklist

Please provide the following information:

chiang8 commented 3 weeks ago

I have the same requirement. I found that using map[string]interface{} will break an assumption of the library. ie. It only supports struct. Reasons:

It would be nice if this library can support map[string]interface{}, even at the expense of losing some of the features.