apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
650 stars 334 forks source link

[BUG] upgrade linkedin/goavro to v2.11.1 #786

Open ghost opened 2 years ago

ghost commented 2 years ago
image

Example code:

package main

import (
    "github.com/apache/pulsar-client-go/pulsar"
    "github.com/pkg/errors"
    "github.com/rs/zerolog/log"
)

type UserPulsarMessage struct {
    FirstName string  `json:"firstName"`
    LastName  string  `json:"lastName"`
    Gender    *string `json:"gender"`
}

var schema = `{
    "name": "UserPulsarMessage",
    "type": "record",
    "fields": [
        {
            "name": "firstName",
            "type": "string"
        },
        {
            "name": "gender",
            "type": ["null", "string"]
        },
        {
            "name": "lastName",
            "type": "string"
        }
    ]
}`

func main() {
    log.Debug().Msg("Start")
    male := "male"
    err := checkAvro("Dmytrii", "Havryliuk", &male)
    if err != nil {
        log.Error().Err(errors.WithStack(err)).Msgf(err.Error())
    }

    log.Debug().Msg("New case")
    err = checkAvro("", "Portland", nil)
    if err != nil {
        log.Error().Err(errors.WithStack(err)).Msgf(err.Error())
    }

    log.Debug().Msg("Done")
}

func checkAvro(fName, lName string, gender *string) error {
    avroSchema := pulsar.NewAvroSchema(schema, nil)
    msg := &UserPulsarMessage{
        FirstName: fName,
        LastName:  lName,
        Gender:    gender,
    }

    payload, err := avroSchema.Encode(msg)
    if err != nil {
        return err
    }

    log.Debug().Msgf("encoded payload: '%s'", string(payload))

    var msgResult *UserPulsarMessage
    err = avroSchema.Decode(payload, &msgResult)
    if err != nil {
        return err
    }

    log.Debug().Msgf("FirstName: '%s', LastName: '%s', Gender: '%s' Successful",
        msgResult.FirstName, msgResult.LastName, msgResult.Gender)
    return nil
}

Output:

{"level":"debug","time":"2022-06-07T19:49:37+03:00","message":"Start"}
ERRO[0000] convert native Go form to binary Avro data error:cannot decode textual record "UserPulsarMessage": cannot decode textual union: expected: '{'; actual: '"' for key: "gender" 
{"level":"error","error":"cannot decode textual record \"UserPulsarMessage\": cannot decode textual union: expected: '{'; actual: '\"' for key: \"gender\"","time":"2022-06-07T19:49:37+03:00","message":"cannot decode textual record \"UserPulsarMessage\": cannot decode textual union: expected: '{'; actual: '\"' for key: \"gender\""}
{"level":"debug","time":"2022-06-07T19:49:37+03:00","message":"New case"}
{"level":"debug","time":"2022-06-07T19:49:37+03:00","message":"encoded payload: '\u0000\u0000\u0010Portland'"}
{"level":"debug","time":"2022-06-07T19:49:37+03:00","message":"FirstName: '', LastName: 'Portland', Gender: '%!s(*string=<nil>)' Successful"}
{"level":"debug","time":"2022-06-07T19:49:37+03:00","message":"Done"}
Exiting.
zzzming commented 2 years ago

@gavrilyuc2 This is not a bug. Upgrading to linkedin/goavro 2.11 would not fix it. (I tried)

In the Avro schema, the gender type is a union. Union in goavro is actually very well documented at https://github.com/linkedin/goavro#translating-from-avro-to-go-data Here is the exact description how the library is encoding a union

Because of encoding rules for Avro unions, when an union's value is null, a simple Go nil is returned. However when an union's value is non-nil, a Go map[string]interface{} with a single key is returned for the union. The map's single key is the Avro type name and its value is the datum's value.

The means you need to declare the data structure as the following.

type UserPulsarMessage struct {
    FirstName string                 `json:"firstName"`
    LastName  string                 `json:"lastName"`
    Gender    map[string]interface{} `json:"gender"`
}

Gender has to be a map according to the README. The value is initialized as Gender: map[string]interface{}{"string": "male"}, or Gender: nil,

I hope this helps.

ghost commented 2 years ago

What about this package?

I think it will be right for simple union types because I have a problem with using map[string]interface{} type with other (python with go producers and consumers) and for producing messages it will be ok, but for consuming messages we have an error like that I wrote before.

But if I use this package I don't see this error.