open-telemetry / opentelemetry-python

OpenTelemetry Python API and SDK
https://opentelemetry.io
Apache License 2.0
1.78k stars 622 forks source link

Support multi-instrument observable callbacks and ability to unregister callbacks #2454

Open aabmass opened 2 years ago

aabmass commented 2 years ago

See spec issues https://github.com/open-telemetry/opentelemetry-specification/issues/2280, https://github.com/open-telemetry/opentelemetry-specification/issues/2232 and https://github.com/open-telemetry/opentelemetry-specification/pull/2317 PR which adds this to the spec.

Our API for creating observable instruments currently looks like this:

def cpu_time_callback() -> Iterable[Measurement]:
    with open("/proc/stat") as procstat:
        procstat.readline()  # skip the first line
        for line in procstat:
            if not line.startswith("cpu"): break
            cpu, *states = line.split()
            yield Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
            yield Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})

meter.create_observable_counter("system.cpu.time", cpu_time_callback)

To support multi instrument callbacks we have a few possibilities:

Approach 1

https://github.com/open-telemetry/opentelemetry-specification/pull/2317#discussion_r801936529 suggested this as a possible way to implement multi instrument callbacks given our current API:

cpuTime = meter.create_observable_counter("system.cpu.time")
procsRunning = meter.create_observable_updowncounter("system.procs_running")
procsBlocked = meter.create_observable_updowncounter("system.procs_blocked")

def proc_stat_observer() -> Iterable[Measurement]:
    with open("/proc/stat") as procstat:
        procstat.readline()  # skip the first line
        for line in procstat:
            if line.startswith("cpu"):
                cpu, *states = line.split()
                yield cpuTime.observe(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
                yield cpuTime.observe(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})
            if line.startswith("procs_running"):
                var, value = line.split()
                yield procsRunning.observe(value)
            if line.startswith("procs_blocked"):
                var, value = line.split()
                yield procsBlocked.observe(value)

callback = meter.register_callback(proc_stat_observer, [cpuTime, procsRunning, procsBlocked])

def stop():
    callback.unregister()

The actual API changes to implement this are

Pros:

Cons:

Approach 2

We could restructure the observable callback to accept an parameter to use for making observations on:

cpuTime = meter.create_observable_counter("system.cpu.time")
procsRunning = meter.create_observable_updowncounter("system.procs_running")
procsBlocked = meter.create_observable_updowncounter("system.procs_blocked")

def proc_stat_observer(observe: Callable[[Asynchronous, Measurement], None]) -> None:
    with open("/proc/stat") as procstat:
        procstat.readline()  # skip the first line
        for line in procstat:
            if line.startswith("cpu"):
                cpu, *states = line.split()
                observe(cpuTime, Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
                observe(cpuTime, Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
            if line.startswith("procs_running"):
                var, value = line.split()
                observe(procsRunning, Measurement(value))
            if line.startswith("procs_blocked"):
                var, value = line.split()
                observe(procsBlocked, Measurement(value))

callback = meter.register_callback(proc_stat_observer, [cpuTime, procsRunning, procsBlocked])

def stop():
    callback.unregister()

Alternatively, the observe function could be an object. This is somewhat similar to this example in the spec

The actual API changes to implement this are

Pros:

Cons:

Approach 3

We could restructure the observable callback to simply call observe() on async instruments, similar to what is suggested in https://github.com/open-telemetry/opentelemetry-specification/issues/2280

asyncCounter := meter.NewAsyncCounter(...)
asyncGauge := meter.NewAsyncGauge(...)
cbfunc := func(ctx context.Context) {
    expensiveResult := expensiveCall()
    asyncCounter.Observe(ctx, expensiveResult.Count)
    asyncGauge.Observe(ctx, expensiveResult.Gauge)
}
meter.NewCallback(cbfunc, asyncCounter, asyncGauge)

Pros:

Cons:


Any other ideas for handling this?

aabmass commented 2 years ago

In addition to any of the above, we could keep the callback parameter when creating an observable instrument as optional so that main use case is streamlined.

ocelotl commented 2 years ago

For others that may read this issue and are also confused as I was (:sweat_smile:), this is not simply creating a callback and then passing the same callback to several instruments, that callback that is being passed needs to be defined after the instruments are created because the callback code calls a method of the instruments.

ocelotl commented 2 years ago

What is very weird is having asynchronous instruments with a public observe method that can pretty much be called synchronously by the user. Shouldn't this case be handled by first creating synchronous instruments and then using these synchronous instruments in the callback function and then creating one or more asynchronous instruments that use this callback?

Please correct me if this is wrong, I may be missing something here :shrug:

aabmass commented 2 years ago

@ocelotl The goal is to have a single callback which, on a single invocation, reports observations for multiple instruments. An example use case (same as above) is scraping /proc/stat which contains a bunch of stats like cpu time, number of processes since boot, number of processes waiting on IO, etc. Each of those metrics need to be reported against a different instrument, and the goal is to do that while only reading the file once.

Shouldn't this case be handled by first creating synchronous instruments and then using these synchronous instruments in the callback function

Synchronous instruments are expecting deltas, but we are trying to "observe" the cumulative/absolute values read from something external (like procfs in this case). If you wanted to do it this way, you'd have to convert the cumulative to a delta to report it with a synchronous instrument which is not ideal

ocelotl commented 2 years ago

Synchronous instruments are expecting deltas, but we are trying to "observe" the cumulative/absolute values read from something external (like procfs in this case). If you wanted to do it this way, you'd have to convert the cumulative to a delta to report it with a synchronous instrument which is not ideal

Yeah, good point. Is it a requirement that the hypothetical new asynchronous instruments observe method is not called outside a callback? This is the point I am having more trouble with.

This "forced" association between temporality and synchronicity (delta with syncrhonous, cumulative with asynchronous), most definitely has a good reason to be (compatibility with previous metrics systems, maybe?), but cases like this one make me wonder if it is the correct design decision (probably nothing we can fix in otel python, though), I am starting to think that these 2 concepts should be independent, I think @lzchen raised this same question in a comment before.

I guess we would need to find a solution to stop the users from calling observe outside a callback. Maybe documentation would be enough? :shrug:

aabmass commented 2 years ago

Yeah, good point. Is it a requirement that the hypothetical new asynchronous instruments observe method is not called outside a callback? This is the point I am having more trouble with.

Josh's PR says "The implementation SHOULD reject observations that are made outside of a callback registered for the instrument."

I guess we would need to find a solution to stop the users from calling observe outside a callback. Maybe documentation would be enough? 🤷

In Approach 1, the instrument's observe() method just outputs a Measurement associated with that instrument. The callback has to yield/return that measurement to feed it to the SDK, which avoids the problem altogether. It's also avoided with Approach 2. Approach 3 you would have to protect against it.

This "forced" association between temporality and synchronicity

Keep in mind the main use case for async instruments is observing some pre-aggregated metrics. As an instrumentation API, it is kind of a shim to pull things from outside your code into OTel. Do you have any examples of when you'd want to observe deltas with an async instrument?

nileshborse92 commented 1 year ago

How can we call callback function in python? if we added at the time of metric creation it will give meter is already created but not reflected in console exporter.

Could you pls give suggestion here? Thanks

lzchen commented 1 year ago

@ocelotl Is the only thing missing to complete this issue is the ability to unregister callbacks?

@nileshborse92 I am not able to fully understand your question. Could you paste a code snippet and an explanation of your problem with actual vs expected output?

ocelotl commented 1 year ago

@lzchen I think so

nileshborse92 commented 1 year ago

Hi,

Please find below snippest code

from opentelemetry import metrics from opentelemetry.metrics import get_meter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (OTLPMetricExporter) from opentelemetry.metrics import CallbackOptions, Observation exporter = OTLPMetricExporter('http://localhost:4317') reader = PeriodicExportingMetricReader(ConsoleMetricExporter())

reader = PeriodicExportingMetricReader(exporter)

metrics.set_meter_provider(MeterProvider(metric_readers=[reader])) meter_new = get_meter("smart-labeller_new")

def wscallback(result):

Note: in the real world these would be retrieved from the operating system result.Observe(8, ("pid", 0), ("bitness", 64))

result.Observe(20, ("pid", 4), ("bitness", 64))
result.Observe(126032, ("pid", 880), ("bitness", 32))

meter_new.create_observable_up_down_counter( name="process.workingset", description="process working set", callback=[wscallback], unit="kB")

Problem:- i am not able to run this Async counter code which is suggested over otel document. Getting "AttributeError: CallbackOptions has no attribute Observe"

Output:- I need to export this metric value in console output.

Please revert here feasible solution.

Thanks.

lzchen commented 1 year ago

Please create a separate issue for this. @nileshborse92