streamnative / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
1 stars 2 forks source link

ISSUE-621: Ordering Key is always empty when consuming #224

Open sijie opened 3 years ago

sijie commented 3 years ago

Original Issue: apache/pulsar-client-go#621


Expected behavior

After producing the message with ordering key, the consumer should be able to receive the message with ordering key.

Actual behavior

msg, err := consumer.Receive(context.Background())
fmt.Printf("Received message msgId: %#v -- content: '%s', topic: '%s', event time: %d, key: %s, order: %s, props: %v\n",
                msg.ID(), string(msg.Payload()), msg.Topic(), msg.EventTime().Unix(), msg.Key(), msg.OrderingKey(), msg.Properties())

msg.OrderingKey() is returning empty string.

Steps to reproduce

Please use this code to reproduce

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/apache/pulsar-client-go/pulsar"
    // "github.com/sirupsen/logrus"
)

func main() {
    // logrus.SetLevel(logrus.TraceLevel)

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "pulsar://localhost:6650",
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    if os.Args[1] == "produce" {

        producer, err := client.CreateProducer(pulsar.ProducerOptions{
            Topic: "123.json",
        })

        _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
            Payload:     []byte("hello"),
            Key:         "123xxxxx",
            OrderingKey: "321xxxxx",
        })

        defer producer.Close()

        if err != nil {
            fmt.Println("Failed to publish message", err)
        }
        fmt.Println("Published message")
    } else {

        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            "123.json",
            SubscriptionName: "test",
            Type:             pulsar.Shared,
        })
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Close()

        for i := 0; i < 10; i++ {
            msg, err := consumer.Receive(context.Background())
            if err != nil {
                log.Fatal(err)
            }

            fmt.Printf("Received message msgId: %#v -- content: '%s', topic: '%s', event time: %d, key: %s, order: %s, props: %v\n",
                msg.ID(), string(msg.Payload()), msg.Topic(), msg.EventTime().Unix(), msg.Key(), msg.OrderingKey(), msg.Properties())

            consumer.Ack(msg)
        }

        if err := consumer.Unsubscribe(); err != nil {
            log.Fatal(err)
        }

    }

}

System configuration

Pulsar version: 2.7.1

Other clients are able to consume ordering key properly