open-telemetry / opentelemetry-go

OpenTelemetry Go API and SDK
https://opentelemetry.io/docs/languages/go
Apache License 2.0
5.18k stars 1.04k forks source link

AsyncCounter and AsyncUpDownCounter expected callback value changed in v0.32.0 #3278

Closed vmihailenco closed 1 year ago

vmihailenco commented 1 year ago

Before 0.32, the value in a callback function was expected to be an absolute value, but since 0.32 it must be a delta or metrics are incorrect:

hitsCounter, _ := meter.AsyncInt64().Counter("some.prefix.cache_hits")

// before 0.32
hitsCounter.Observe(ctx, hitsValue) // abs value

// 0.32
hitsCounter.Observe(ctx, hitsValue-prevHitsValue) // delta value

Is that intentional? It is not documented anywhere and not mentioned in the changelog. Also, runtimemetrics still uses the old way to report observable values.

MrAlias commented 1 year ago

The new SDK follows the OpenTelemetry specification in defining the default temporality to be cumulative for all instrument kinds. Setting a TemporalitySelector using WithTemporalitySelector that defines delta temporality for these instrument types should result in the exact value being reported.

I'm still looking through the specification and other implementations to see if this is the expected behavior.

vmihailenco commented 1 year ago

It is not about TemporalitySelector - both temporalities are broken if you report/observe absolute values. And both temporalities work if you report/observe delta values.

For example, the following app reports 3 6 9 12 etc, but it worked previously:

package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
    "go.opentelemetry.io/otel/metric/global"
    "go.opentelemetry.io/otel/metric/instrument"
    "go.opentelemetry.io/otel/sdk/metric"
)

var meter = global.MeterProvider().Meter("app_or_package_name")

func main() {
    ctx := context.Background()

    exp, err := stdoutmetric.New()
    if err != nil {
        panic(err)
    }

    reader := metric.NewPeriodicReader(
        exp,
        metric.WithInterval(time.Second),
    )
    provider := metric.NewMeterProvider(
        metric.WithReader(reader),
    )

    global.SetMeterProvider(provider)

    go upDownCounterObserver(ctx)

    ch := make(chan os.Signal, 3)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
    <-ch
}

func upDownCounterObserver(ctx context.Context) {
    counter, err := meter.AsyncInt64().UpDownCounter(
        "some.prefix.up_down_counter_async",
    )
    if err != nil {
        panic(err)
    }

    if err := meter.RegisterCallback(
        []instrument.Asynchronous{
            counter,
        },
        func(ctx context.Context) {
            counter.Observe(ctx, 3)
        },
    ); err != nil {
        panic(err)
    }
}
MrAlias commented 1 year ago

Setting the temporatlity selector like mentioned does indeed produce your requested behavior:

package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
    "go.opentelemetry.io/otel/metric/global"
    "go.opentelemetry.io/otel/metric/instrument"
    "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/sdk/metric/metricdata"
    "go.opentelemetry.io/otel/sdk/metric/view"
)

var meter = global.MeterProvider().Meter("app_or_package_name")

func delta(view.InstrumentKind) metricdata.Temporality {
    return metricdata.DeltaTemporality
}

func main() {
    ctx := context.Background()

    exp, err := stdoutmetric.New()
    if err != nil {
        panic(err)
    }

    reader := metric.NewPeriodicReader(
        exp,
        metric.WithInterval(time.Second),
        metric.WithTemporalitySelector(delta),
    )
    provider := metric.NewMeterProvider(
        metric.WithReader(reader),
    )

    global.SetMeterProvider(provider)

    go upDownCounterObserver(ctx)

    ch := make(chan os.Signal, 3)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
    <-ch
}

func upDownCounterObserver(ctx context.Context) {
    counter, err := meter.AsyncInt64().UpDownCounter(
        "some.prefix.up_down_counter_async",
    )
    if err != nil {
        panic(err)
    }

    if err := meter.RegisterCallback(
        []instrument.Asynchronous{
            counter,
        },
        func(ctx context.Context) {
            counter.Observe(ctx, 3)
        },
    ); err != nil {
        panic(err)
    }
}
go 1.18

require (
    go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.32.3
    go.opentelemetry.io/otel/metric v0.32.3
    go.opentelemetry.io/otel/sdk/metric v0.32.3
)

require (
    github.com/davecgh/go-spew v1.1.1 // indirect
    github.com/go-logr/logr v1.2.3 // indirect
    github.com/go-logr/stdr v1.2.2 // indirect
    go.opentelemetry.io/otel v1.11.0 // indirect
    go.opentelemetry.io/otel/sdk v1.11.0 // indirect
    go.opentelemetry.io/otel/trace v1.11.0 // indirect
    golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
)
$ go run .
{
    "Resource": [
        {
            "Key": "service.name",
            "Value": {
                "Type": "STRING",
                "Value": "unknown_service:prometheus"
            }
        },
        {
            "Key": "telemetry.sdk.language",
            "Value": {
                "Type": "STRING",
                "Value": "go"
            }
        },
        {
            "Key": "telemetry.sdk.name",
            "Value": {
                "Type": "STRING",
                "Value": "opentelemetry"
            }
        },
        {
            "Key": "telemetry.sdk.version",
            "Value": {
                "Type": "STRING",
                "Value": "1.11.0"
            }
        }
    ],
    "ScopeMetrics": [
        {
            "Scope": {
                "Name": "app_or_package_name",
                "Version": "",
                "SchemaURL": ""
            },
            "Metrics": [
                {
                    "Name": "some.prefix.up_down_counter_async",
                    "Description": "",
                    "Unit": "",
                    "Data": {
                        "DataPoints": [
                            {
                                "Attributes": [],
                                "StartTime": "2022-10-15T08:14:58.786370919-07:00",
                                "Time": "2022-10-15T08:14:59.786460097-07:00",
                                "Value": 3
                            }
                        ],
                        "Temporality": "DeltaTemporality",
                        "IsMonotonic": false
                    }
                }
            ]
        }
    ]
}
{
    "Resource": [
        {
            "Key": "service.name",
            "Value": {
                "Type": "STRING",
                "Value": "unknown_service:prometheus"
            }
        },
        {
            "Key": "telemetry.sdk.language",
            "Value": {
                "Type": "STRING",
                "Value": "go"
            }
        },
        {
            "Key": "telemetry.sdk.name",
            "Value": {
                "Type": "STRING",
                "Value": "opentelemetry"
            }
        },
        {
            "Key": "telemetry.sdk.version",
            "Value": {
                "Type": "STRING",
                "Value": "1.11.0"
            }
        }
    ],
    "ScopeMetrics": [
        {
            "Scope": {
                "Name": "app_or_package_name",
                "Version": "",
                "SchemaURL": ""
            },
            "Metrics": [
                {
                    "Name": "some.prefix.up_down_counter_async",
                    "Description": "",
                    "Unit": "",
                    "Data": {
                        "DataPoints": [
                            {
                                "Attributes": [],
                                "StartTime": "2022-10-15T08:14:59.786460097-07:00",
                                "Time": "2022-10-15T08:15:00.786644182-07:00",
                                "Value": 3
                            }
                        ],
                        "Temporality": "DeltaTemporality",
                        "IsMonotonic": false
                    }
                }
            ]
        }
    ]
}
^C
vmihailenco commented 1 year ago

Setting the temporatlity selector like mentioned does indeed produce your requested behavior:

Well, if you mean that is how metricdata.CumulativeTemporality should behave, then yes. But you've selected metricdata.DeltaTemporality and it is not working correctly, because the counter is not changing at all. In other words, using DeltaTemporality does not help.

And it is not like I am requesting a new behavior. That is how it worked until v0.32.0. And right now async counters are broken.

jmacd commented 1 year ago

In https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation,

Note: Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter. To determine the reported rate the counter is changing, the difference between successive measurements is used.

The v0.31 and prior SDK were correct. @MrAlias this has the appearance of a bug -- where the asynchronous form of aggregator should start from scratch (i.e., 0) on each collection round so that the cumulative output matches the instrumented value (if filtering, the sum of instrumented values) and the delta output is the difference as described in the API spec. Looking for your input on how the SDK specification could be improved to help clarify this, thanks.

MrAlias commented 1 year ago

In https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation,

Note: Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter. To determine the reported rate the counter is changing, the difference between successive measurements is used.

The v0.31 and prior SDK were correct. @MrAlias this has the appearance of a bug -- where the asynchronous form of aggregator should start from scratch (i.e., 0) on each collection round so that the cumulative output matches the instrumented value (if filtering, the sum of instrumented values) and the delta output is the difference as described in the API spec. Looking for your input on how the SDK specification could be improved to help clarify this, thanks.

Thanks! This is the thing I was looking for.

If we wanted to make this clearer in the specification, I was initially looking for this in the Asynchronous Counter operations section. Having this defined there would be helpful, but I don't think its critical.

vmihailenco commented 1 year ago

@MrAlias this issue was only fixed for cumulative temporality. Delta is still broken.

got: 3 3 3 3 3
expected: 3 0 0 0 0 0
package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
    "go.opentelemetry.io/otel/metric/global"
    "go.opentelemetry.io/otel/metric/instrument"
    "go.opentelemetry.io/otel/sdk/metric"
    "go.opentelemetry.io/otel/sdk/metric/metricdata"
    "go.opentelemetry.io/otel/sdk/metric/view"
)

var meter = global.MeterProvider().Meter("app_or_package_name")

func main() {
    ctx := context.Background()

    exp, err := stdoutmetric.New()
    if err != nil {
        panic(err)
    }

    reader := metric.NewPeriodicReader(
        exp,
        metric.WithInterval(time.Second),
        metric.WithTemporalitySelector(statelessTemporalitySelector),
    )
    provider := metric.NewMeterProvider(
        metric.WithReader(reader),
    )

    global.SetMeterProvider(provider)

    go upDownCounterObserver(ctx)

    ch := make(chan os.Signal, 3)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
    <-ch
}

func upDownCounterObserver(ctx context.Context) {
    counter, err := meter.AsyncInt64().UpDownCounter(
        "some.prefix.up_down_counter_async",
    )
    if err != nil {
        panic(err)
    }

    if err := meter.RegisterCallback(
        []instrument.Asynchronous{
            counter,
        },
        func(ctx context.Context) {
            counter.Observe(ctx, 3)
        },
    ); err != nil {
        panic(err)
    }
}

func statelessTemporalitySelector(kind view.InstrumentKind) metricdata.Temporality {
    return metricdata.DeltaTemporality
}
jmacd commented 1 year ago

@vmihailenco This code snippet:

func statelessTemporalitySelector(kind view.InstrumentKind) metricdata.Temporality {
    return metricdata.DeltaTemporality
}

creates confusion with me because the implementation proposed for "stateless" in the Lightstep metrics SDK (and the otel specification) is that "stateless" means delta for synchronous instruments while it means cumulative for asynchronous instruments. Otherwise, using delta temporality for an asynchronous instrument requires state in the SDK. (Note: I personally argued that supporting delta temporality for asynchronous counters is unnecessary complexity, but several vendors (New Relic and DynaTrace) convinced us otherwise.

That said, I think you your expectation is correct: configured for delta temporality, an asynchronous counter that reports a constant value should appear as a sequence like [3 0 0 0 0] as you suggest.

MrAlias commented 1 year ago

@MrAlias this issue was only fixed for cumulative temporality. Delta is still broken.

got: 3 3 3 3 3
expected: 3 0 0 0 0 0
package main

import (
  "context"
  "os"
  "os/signal"
  "syscall"
  "time"

  "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
  "go.opentelemetry.io/otel/metric/global"
  "go.opentelemetry.io/otel/metric/instrument"
  "go.opentelemetry.io/otel/sdk/metric"
  "go.opentelemetry.io/otel/sdk/metric/metricdata"
  "go.opentelemetry.io/otel/sdk/metric/view"
)

var meter = global.MeterProvider().Meter("app_or_package_name")

func main() {
  ctx := context.Background()

  exp, err := stdoutmetric.New()
  if err != nil {
      panic(err)
  }

  reader := metric.NewPeriodicReader(
      exp,
      metric.WithInterval(time.Second),
      metric.WithTemporalitySelector(statelessTemporalitySelector),
  )
  provider := metric.NewMeterProvider(
      metric.WithReader(reader),
  )

  global.SetMeterProvider(provider)

  go upDownCounterObserver(ctx)

  ch := make(chan os.Signal, 3)
  signal.Notify(ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
  <-ch
}

func upDownCounterObserver(ctx context.Context) {
  counter, err := meter.AsyncInt64().UpDownCounter(
      "some.prefix.up_down_counter_async",
  )
  if err != nil {
      panic(err)
  }

  if err := meter.RegisterCallback(
      []instrument.Asynchronous{
          counter,
      },
      func(ctx context.Context) {
          counter.Observe(ctx, 3)
      },
  ); err != nil {
      panic(err)
  }
}

func statelessTemporalitySelector(kind view.InstrumentKind) metricdata.Temporality {
  return metricdata.DeltaTemporality
}

Your expectation does not match my understanding of the specification. Each cycle your callback is reporting 3. If you expect to observe 3 0 0 0 0 0 you would need to observe 3 and then 0 repeatedly. That would comply with the quoted specification above:

Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter.

jmacd commented 1 year ago

Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter.

The word "absolute" is potentially misleading--it's been debated in the past and we left that word in place, but other words we considered are "current" and "cumulative" and "total". Another way to describe this is that the callback reports the value that a cumulative-temporality reader will see. For asynchronous counters, a delta temporality reader should see the difference between successive "absolute" values.

MrAlias commented 1 year ago

Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter.

The word "absolute" is potentially misleading--it's been debated in the past and we left that word in place, but other words we considered are "current" and "cumulative" and "total". Another way to describe this is that the callback reports the value that a cumulative-temporality reader will see. For asynchronous counters, a delta temporality reader should see the difference between successive "absolute" values.

Ah, gotcha! I think I see now.

I had understood it as the user will tell you the temporality of the value that is provided. However, it is that the instrumentation will always record a cumulative value and if a user overrides the temporality to be delta the exported data needs to be converted delta.

I'll look into a PR to address this.

Also, I'm going to take another look at the other implementation. I think I either misinterpreted their code or they are doing what is implemented by #3350.

jmacd commented 1 year ago

Confusingly, 1000 PRs prior I removed the Subtract support from the prototype metrics SDK,

https://github.com/open-telemetry/opentelemetry-go/pull/2350

I did that because I thought the specification would not require cumulative-to-delta translation; however the specification reversed itself on that topic. In my newer implementation, there is a statefulAsyncInstrument type that performs cumulative-to-delta on asynchronous measurements. (I truly wish we didn't have this requirement!)

MrAlias commented 1 year ago

instrumentation will always record a cumulative value and if a user overrides the temporality to be delta the exported data needs to be converted delta.

@jmacd Is this also the way Async*UpDownCounters are intended to interpret their input?

jmacd commented 1 year ago

Yes, that's correct. @MrAlias I'd like your help pinpointing where the specification can improve--although now that we're on the topic, when faced with questions in this territory @reyang responded with supplementary guidelines. Here's a section on the asynchronous counter, delta temporality case: https://opentelemetry.io/docs/reference/specification/metrics/supplementary-guidelines/#asynchronous-example-delta-temporality

MrAlias commented 1 year ago

I'd like your help pinpointing where the specification can improve

I think having normative statement defining the required behavior in the specification would be helpful.

For example, in the SDK definition of an asynchronous counter, having a section on the Observe functionality with something like:

The passed value MUST be interpreted as a cumulative sum.

If the instrument aggregation is determined to be Drop, all values MUST be dropped.

If the instrument aggregation is determined to be Sum, all values MUST be aggregated as sums.

All filters from views that match the instrument and have an attribute filter defined MUST be applied to the passed attributes. Each unique set of attributes produced by applying a filter MUST have the passed value aggregated for it. If no filters apply, the passed value MUST be aggregated for the passed attributes themselves.

Aggregated values for instruments with cumulative temporality MUST produce cumulative sum values. This means, based on the passed value being interpreted as a cumulative value, passed values need to be reported directly for all the applicable attribute sets.

Aggregated values for instruments with delta temporality MUST produce delta sum values. This means, based on the passed value being interpreted as a cumulative value, the previously reported values need to be subtracted from the currently aggregated values for all the applicable attribute sets.

In the top level definition it should also mention that the asynchronous counter types have restriction on the aggregations they support. Possiblly:

The SDK MUST NOT configure an aggregation other than Sum or Drop for an asynchronous {,UpDown}Counter.

reyang commented 1 year ago

There is a note in https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter

"Note: Unlike Counter.Add() which takes the increment/delta value, the callback function reports the absolute value of the counter. To determine the reported rate the counter is changing, the difference between successive measurements is used."

I feel "the passed value MUST be interpreted as a cumulative sum" is asking too much, since one could decide to interpret it as Gauge (which is not cumulative) as well.

vmihailenco commented 1 year ago

is that "stateless" means delta for synchronous instruments while it means cumulative for asynchronous instruments

For Uptrace/ClickHouse I need delta temp whenever possible, but I guess I should just avoid using stateless name. Here is what we plan to use:

func temporalitySelector(kind view.InstrumentKind) metricdata.Temporality {
     switch kind {
     case view.SyncCounter, view.AsyncCounter, view.SyncHistogram:
        return metricdata.DeltaTemporality
    default:
        return metricdata.CumulativeTemporality
    }
}

In Python, they have preferred_temporality=delta/cumulative which I think does exactly that and I like the name/API. Perhaps Go could have the same?

MrAlias commented 1 year ago

@MrAlias this issue was only fixed for cumulative temporality. Delta is still broken.

got: 3 3 3 3 3
expected: 3 0 0 0 0 0
package main

import (
  "context"
  "os"
  "os/signal"
  "syscall"
  "time"

  "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
  "go.opentelemetry.io/otel/metric/global"
  "go.opentelemetry.io/otel/metric/instrument"
  "go.opentelemetry.io/otel/sdk/metric"
  "go.opentelemetry.io/otel/sdk/metric/metricdata"
  "go.opentelemetry.io/otel/sdk/metric/view"
)

var meter = global.MeterProvider().Meter("app_or_package_name")

func main() {
  ctx := context.Background()

  exp, err := stdoutmetric.New()
  if err != nil {
      panic(err)
  }

  reader := metric.NewPeriodicReader(
      exp,
      metric.WithInterval(time.Second),
      metric.WithTemporalitySelector(statelessTemporalitySelector),
  )
  provider := metric.NewMeterProvider(
      metric.WithReader(reader),
  )

  global.SetMeterProvider(provider)

  go upDownCounterObserver(ctx)

  ch := make(chan os.Signal, 3)
  signal.Notify(ch, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
  <-ch
}

func upDownCounterObserver(ctx context.Context) {
  counter, err := meter.AsyncInt64().UpDownCounter(
      "some.prefix.up_down_counter_async",
  )
  if err != nil {
      panic(err)
  }

  if err := meter.RegisterCallback(
      []instrument.Asynchronous{
          counter,
      },
      func(ctx context.Context) {
          counter.Observe(ctx, 3)
      },
  ); err != nil {
      panic(err)
  }
}

func statelessTemporalitySelector(kind view.InstrumentKind) metricdata.Temporality {
  return metricdata.DeltaTemporality
}

I have verified #3398 to produce the expected output