tikv / rust-prometheus

Prometheus instrumentation library for Rust applications
Apache License 2.0
1.04k stars 182 forks source link

`push_metrics` panics inside tokio #453

Open yds12 opened 1 year ago

yds12 commented 1 year ago

Describe the bug Calling push_metrics from an async function with tokio causes this error:

thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.' .../.cargo/registry/src/github.com-.../tokio-1.20.1/src/runtime/blocking/shutdown.rs:51:21

To Reproduce Take the push example, annotate main with #[tokio::main], using tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "macros"] } in the Cargo.toml. Running the example will produce the error above.

Expected behavior No panic or async-compatible alternative.

yds12 commented 1 year ago

It's quite simple to make it work, basically just need to not use reqwest::blocking::Client, and put some async and await here and there (see patch below). But of course that would be a breaking change and force people to use async, so I don't know what you guys would like to do, maybe put the async code behind a feature flag? If you are interested I can send a PR for this.

diff --git a/examples/example_push.rs b/examples/example_push.rs
index 22d0195..c20f94b 100644
--- a/examples/example_push.rs
+++ b/examples/example_push.rs
@@ -26,7 +26,8 @@ lazy_static! {
 }

 #[cfg(feature = "push")]
-fn main() {
+#[tokio::main]
+async fn main() {
     let args: Vec<String> = env::args().collect();
     let program = args[0].clone();

@@ -63,6 +64,7 @@ fn main() {
                 password: "pass".to_owned(),
             }),
         )
+        .await
         .unwrap();
     }

diff --git a/src/push.rs b/src/push.rs
index 525b342..a3d039a 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -6,7 +6,7 @@ use std::hash::BuildHasher;
 use std::str::{self, FromStr};
 use std::time::Duration;

-use reqwest::blocking::Client;
+use reqwest::Client;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, StatusCode, Url};

@@ -52,32 +52,32 @@ pub struct BasicAuthentication {
 /// Note that all previously pushed metrics with the same job and other grouping
 /// labels will be replaced with the metrics pushed by this call. (It uses HTTP
 /// method 'PUT' to push to the Pushgateway.)
-pub fn push_metrics<S: BuildHasher>(
+pub async fn push_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "PUT", basic_auth)
+    push(job, grouping, url, mfs, "PUT", basic_auth).await
 }

 /// `push_add_metrics` works like `push_metrics`, but only previously pushed
 /// metrics with the same name (and the same job and other grouping labels) will
 /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
-pub fn push_add_metrics<S: BuildHasher>(
+pub async fn push_add_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "POST", basic_auth)
+    push(job, grouping, url, mfs, "POST", basic_auth).await
 }

 const LABEL_NAME_JOB: &str = "job";

-fn push<S: BuildHasher>(
+async fn push<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -160,7 +160,7 @@ fn push<S: BuildHasher>(
         builder = builder.basic_auth(username, Some(password));
     }

-    let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
+    let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;

     match response.status() {
         StatusCode::ACCEPTED => Ok(()),
@@ -173,7 +173,7 @@ fn push<S: BuildHasher>(
     }
 }

-fn push_from_collector<S: BuildHasher>(
+async fn push_from_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -187,31 +187,31 @@ fn push_from_collector<S: BuildHasher>(
     }

     let mfs = registry.gather();
-    push(job, grouping, url, mfs, method, basic_auth)
+    push(job, grouping, url, mfs, method, basic_auth).await
 }

 /// `push_collector` push metrics collected from the provided collectors. It is
 /// a convenient way to push only a few metrics.
-pub fn push_collector<S: BuildHasher>(
+pub async fn push_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth).await
 }

 /// `push_add_collector` works like `push_add_metrics`, it collects from the
 /// provided collectors. It is a convenient way to push only a few metrics.
-pub fn push_add_collector<S: BuildHasher>(
+pub async fn push_add_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "POST", basic_auth).await
 }

 const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
@@ -264,8 +264,8 @@ mod tests {
         assert!(!map.is_empty());
     }

-    #[test]
-    fn test_push_bad_label_name() {
+    #[tokio::test]
+    async fn test_push_bad_label_name() {
         let table = vec![
             // Error message: "pushed metric {} already contains a job label"
             (LABEL_NAME_JOB, "job label"),
@@ -280,7 +280,7 @@ mod tests {
             m.set_label(from_vec!(vec![l]));
             let mut mf = proto::MetricFamily::new();
             mf.set_metric(from_vec!(vec![m]));
-            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
+            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None).await;
             assert!(format!("{}", res.unwrap_err()).contains(case.1));
         }
     }
lucab commented 1 year ago

Thanks for the report. Have you perhaps tried the features-set described in https://github.com/tikv/rust-prometheus/issues/342#issuecomment-783738547 already?

yds12 commented 1 year ago

Hello, thanks for the reply. Just tried here, but the same error occurs.

taj-p commented 1 year ago

FWIW, the original fix no longer works. It's failing on the latest tokio (1.23.0) as far as I can tell.

maoertel commented 2 weeks ago

I wrote prometheus-push as a crate that handles the push functionality, so prometheus crates does not necessarily have to take care of this. prometheus-push works blocking, non-blocking, with this crate or "the other" prometheus_client crate. Or you can implement the traits provided there your self to use it whatever you want.

For this crate and non blocking reqwest it is as easy as that:

[dependencies]
prometheus_push = { version = "<version>", default-features = false, features = ["with_reqwest", "prometheus_crate"] }
use prometheus::labels;
use prometheus_push::prometheus_crate::PrometheusMetricsPusher;
use reqwest::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let push_gateway: Url = Url::parse("<address to pushgateway>")?;
    let client = Client::new();
    let metrics_pusher = PrometheusMetricsPusher::from(client, &push_gateway)?;
    metrics_pusher
        .push_all(
            "<your push jobs name>",
            &labels! { "<label_name>" => "<label_value>" },
            prometheus::gather(),
        )
        .await?;

    Ok(())
}