apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
659 stars 336 forks source link

pulsar_client_consumer_processing_time_seconds metric records incorrect values #1154

Open ojcm opened 10 months ago

ojcm commented 10 months ago

Expected behavior

The pulsar_client_consumer_processing_time_seconds metric should contain observations of the time it takes a consumer to process messages.

Actual behavior

The observations are wrong. They have values of approximately 8.5e+09 (~269 years).

Steps to reproduce

Create a consumer, ack some messages, check the metrics. e.g.

func main() {
    http.Handle("/metrics", promhttp.Handler())
    go http.ListenAndServe(":9090", nil)

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:            URL,
        Authentication: pulsar.NewAuthenticationToken(TOKEN),
    })
    if err != nil {
        panic(err)
    }
    defer client.Close()

    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            TOPIC,
        Name:             "test-consumer",
        SubscriptionName: "test-subscription",
    })
    if err != nil {
        panic(err)
    }

    msg, err := consumer.Receive(context.Background())
    if err != nil {
        panic(err)
    }

    err = consumer.Ack(msg)
    if err != nil {
        panic(err)
    }

    time.Sleep(time.Hour)
}

produces

# HELP pulsar_client_consumer_processing_time_seconds Time it takes for application to process messages
# TYPE pulsar_client_consumer_processing_time_seconds histogram
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.0005"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.001"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.005"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.01"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.025"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.05"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.1"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.25"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="0.5"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="1"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="2.5"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="5"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="10"} 0
pulsar_client_consumer_processing_time_seconds_bucket{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN",le="+Inf"} 1
pulsar_client_consumer_processing_time_seconds_sum{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN"} 8.500242974069234e+09
pulsar_client_consumer_processing_time_seconds_count{client="go",pulsar_namespace="HIDDEN",pulsar_tenant="HIDDEN"} 

System configuration

Pulsar version: 2.10 Pulsar client version:v0.11.1

treuherz commented 10 months ago

It seems like the code around tracking message IDs has evolved in such a way to completely break this metric. In both places this metric is observed (1, 2) it happens immediately after initialisation of a trackingMessageID without setting the receivedTime field. This has two obvious problems.

Firstly, the observation is relying on a field that isn't set, so the operation to get time since the receivedtime will always produce nonsense values. Secondly, because the struct owning that field was only just created, even if it had been set the value would be useless.

I would be happy to raise a PR to fix this, but the intention of the existing code doesn't make any sense to me. Some guidance from maintainers on how this is supposed to work would be really appreciated.