riferrei / srclient

Golang Client for Schema Registry
Apache License 2.0
236 stars 70 forks source link

Avro serialization junk value in confluent cloud #15

Closed shivakumarss closed 4 years ago

shivakumarss commented 4 years ago

i am using this client to create the schema and send avro messages to topic. This works fine in my local machine ( macOS ) and in my GCP compute unit where downloaded version of confluent is running.

After the trails i got the confluent cloud access and tried with basic avro serialization doesnt seems to work as expected. The value section is always prefixed with junk characters and doesn't have the same behaviour which worked out in my local machine and my GCP single machine.

Here is the details and code snippet and the process which is followed.

Golang Code snippet for sending the data to confluent cloud

func MethodPOCAvro() {

    type ComplexType struct {
        ID   int    `json:"id"`
        Name string `json:"name"`
    }

    sampleSchema := `{
        "doc": "Sample schema to help you get started...",
        "fields": [
          {
            "doc": "The int type is a 32-bit signed integer.",
            "name": "id",
            "type": "int"
          },
          {
            "doc": "The string is a unicode character sequence.",
            "name": "name",
            "type": "string"
          }
        ],
        "name": "my_sample_schema_three",
        "namespace": "com.mycorp.mynamespace.local.three",
        "type": "record"
      }`
    topic := "sample_topic_003"
    kafkaSchemaRegistryURL = "https://xxx.gcp.confluent.cloud"

    /*** Start registering schemas in the confluent schema registry ***/
    schemaRegistryClient := srclient.CreateSchemaRegistryClient(kafkaSchemaRegistryURL)
    schemaRegistryClient.SetCredentials("xxx", "xxx")

    schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
    if schema == nil {
        log.Print("Schema not found registering now ")
        schema, err = schemaRegistryClient.CreateSchema(topic, sampleSchema, srclient.Avro, false)
        if schema != nil {
            log.Print("Register successful ")
        }
        if err != nil {
            panic(fmt.Sprintf("Error creating the schema  %s", err))
        }
    }

    // Init producer
    p, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "xxx.confluent.cloud:9092",
        "sasl.mechanisms":   "PLAIN",
        "security.protocol": "SASL_SSL",
        // "debug":                               "all",
        "sasl.username":                       "xxx",
        "enable.ssl.certificate.verification": "false",
        "sasl.password":                       "xxx"})

        time.Sleep(1 * time.Second)

        schemaIDBytes := make([]byte, 4)
        binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
        log.Println("Schema ID ", schema.ID())

        // 3) Serialize the record using the schema provided by the client,
        // making sure to include the schema id as part of the record.
        newComplexType := ComplexType{ID: 1, Name: "Gopher"}
        value, _ := json.Marshal(newComplexType)
        log.Println("value ", value)

        native, _, _ := schema.Codec().NativeFromTextual(value)
        log.Println("native ", native)

        valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)
        log.Println("valueBytes ", valueBytes)
        var recordValue []byte
        recordValue = append(recordValue, byte(0))
        recordValue = append(recordValue, schemaIDBytes...)
        recordValue = append(recordValue, valueBytes...)
        key, _ := uuid.NewUUID()
        log.Println("recordValue ", recordValue)

        err = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{
                Topic: &topic, Partition: kafka.PartitionAny},
            Key: []byte(key.String()), Value: recordValue}, nil)

        if err != nil {
            fmt.Fprintf(os.Stderr, "Exception while producing the message .\n")
        }

        k := p.Flush(200)
        log.Print("Pushed ", k)

}
Screenshot 2020-09-10 at 12 12 59 AM
Schema not found registering now 
Register successful 

Schema ID  100022
value  [123 34 105 100 34 58 49 44 34 110 97 109 101 34 58 34 71 111 112 104 101 114 34 125]
native  map[id:1 name:Gopher]
valueBytes  [2 12 71 111 112 104 101 114]
recordValue  [0 0 1 134 182 2 12 71 111 112 104 101 114]
Pushed 1
Schema ID  1
value  [123 34 105 100 34 58 49 44 34 110 97 109 101 34 58 34 71 111 112 104 101 114 34 125]
native  map[id:1 name:Gopher]
valueBytes  [2 12 71 111 112 104 101 114]
recordValue  [0 0 0 0 1 2 12 71 111 112 104 101 114]
Pushed 1

Notes :

shivakumarss commented 4 years ago

Edit :

It turned out to be bug in the confluent cloud UI. I have validated the data in the consumer and its fine.

References : https://groups.google.com/g/confluent-platform/c/3pSU1TsNg3A/m/7wNWu27UAAAJ https://stackoverflow.com/questions/63826163/confluent-cloud-java-avro-serialization-junk-characters-in-value https://github.com/confluentinc/examples/issues/780#issuecomment-690233778

Closing this ticket.