confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.67k stars 660 forks source link

Protobuf producer serialization slow #1317

Open bichselb opened 1 month ago

bichselb commented 1 month ago

Description

I am using the protobuf producer outlined here. Unfortunately, its serialization is so slow that it a bottleneck in my whole system. Specifically, it takes 1.6s to serialize 1 million messages, even though pure protobuf can serialize the same 1 million messages in 100ms.

How can I speed up serialization to avoid this 15x overhead? I would like to avoid parallelization.

How to reproduce

I put my whole experimental setup here: https://github.com/bichselb/confluent-kafka-go-performance/tree/serialization-slow

My serialization is a straight-forward adaptation of the protobuf producer example, except that I enabled CacheSchemas (without this, the code is another order of magnitude slower):

func serializeWithSchemaregistry(nMessages int) {
    registry, err := schemaregistry.NewClient(schemaregistry.NewConfig("http://schema-registry:8081"))
    if err != nil {
        fmt.Printf("Error creating schema registry client...\n")
        panic(err)
    }
    defer registry.Close()

    serializer, err := protobuf.NewSerializer(
        registry,
        serde.ValueSerde,
        &protobuf.SerializerConfig{
            SerializerConfig: *serde.NewSerializerConfig(),
            CacheSchemas: true,  // big difference for performance
        },
    )
    if err != nil {
        fmt.Printf("Error creating serializer...\n")
        panic(err)
    }
    defer serializer.Close()

    start := time.Now()
    topic := "mytopic"
    for i := 0; i < nMessages; i++ {
        message := pb.MyMessage{
            A: int32(i),
            B: int32(i),
        }

        payload, err := serializer.Serialize(topic, &message)
        if err != nil {
            fmt.Printf("Error serializing message...\n")
            panic(err)
        }
        // Pseudo-use of payload to prevent compiler optimizations
        _ = copy(payload, payload)
    }
    fmt.Printf("Protobuf+schemaregistry: Serialized %d messages in %v\n", nMessages, time.Since(start))
}

Checklist

Please provide the following information: