open-telemetry / opentelemetry-rust

The Rust OpenTelemetry implementation
https://opentelemetry.io
Apache License 2.0
1.78k stars 411 forks source link

[Feature]: `Meter` `register_callback` or an alternative should accept an async function #1376

Open ewoolsey opened 9 months ago

ewoolsey commented 9 months ago

Either create a register_async_callback, or change the signature of the original. Personally I prefer the first option.

Here is a relevant discussion, https://github.com/open-telemetry/opentelemetry-rust/discussions/1362

ewoolsey commented 9 months ago

I found a potential workaround which was much harder than I originally thought. Attempting to block_on within the closure resulted deadlocks as did attempting to spawn_blocking did not help. I found a solution like this to be acceptable, though not ideal.

Personally I would really love a proper solution to this where register_callback accepts an async fn

    let (ping_sender, ping_receiver) = tokio::sync::mpsc::unbounded_channel::<()>();
    let (metrics_sender, metrics_receiver) = tokio::sync::mpsc::unbounded_channel::<Metrics>();
    let metrics_receiver = RwLock::new(metrics_receiver);
    tokio::spawn(metrics_handler(ping_receiver, metrics_sender));

    meter.register_callback(
        &[balance.as_any(), value.as_any(), total_value.as_any()],
        move |observer| {
            let metrics = thread::scope(|s| {
                s.spawn(|| {
                    ping_sender.send(()).unwrap();
                    metrics_receiver.write().blocking_recv().unwrap()
                })
                .join()
                .unwrap()
            });
            observer.observe_f64(&total_value, metrics.total_value, &[]);
            for (asset, (bal, val)) in metrics.balances {
                let asset_str = serde_json::to_string(&asset).unwrap();
                observer.observe_f64(&balance, bal, &[KeyValue::new("asset", asset_str.clone())]);
                observer.observe_f64(&value, val, &[KeyValue::new("asset", asset_str)]);
            }
        },
    )?;

Then having a handler which operates on the original tokio runtime.

async fn metrics_handler(
    mut ping_receiver: UnboundedReceiver<()>,
    metrics_sender: UnboundedSender<Metrics>,
) {
    loop {
        let Some(()) = ping_receiver.recv().await else {
            break;
        };
        match collect_metrics().await {
            Ok(metrics) => {
                if let Err(e) = metrics_sender.send(metrics) {
                    error!("Error in metrics handler - {e}")
                }
            }
            Err(e) => error!("Error in metrics handler - {e}"),
        }
    }
}
jtescher commented 9 months ago

The biggest question here is if metrics integrations should always require an async runtime. Or if this case could be caught at compile time if you registered an async callback without one configured (trickier if libraries register callbacks this way).

ewoolsey commented 8 months ago

So I've run into an annoying problem with my solutions above caused by channels closing irregularly and I would really love a solution here. Would a PR for a register_async_callback be welcome?

jtescher commented 8 months ago

@ewoolsey a solution here would certainly be welcome, I think the above question around requiring users to have an async runtime or if it would require other concessions like only working for periodic readers, etc is still open. I haven't had time to dig into those details but I think it would be worth it for you to look at if you have time 👍

sbernauer commented 8 months ago

I had to fiddle around with tokio::runtime::Builder instead of #[tokio::main], as I need to ensure at least 3 tokio workers are present. Without that my application was deadlocking on systems with 2 or less cores (as it has on Kubernetes).

So a solution would be really appreciated and even pulling in your patched branch from the PR might be favorable over what I'm currently doing :D

GabrielBianconi commented 6 months ago

I'd like to see a solution for this as well.

For now, I was able to get around it by launching the meter provider in its own dedicated Tokio runtime in a new thread. The separate Tokio runtime seems to avoid issues with the tasks in the primary Tokio runtime.