getsentry / sentry-rust

Official Sentry SDK for Rust
https://sentry.io/
Apache License 2.0
608 stars 148 forks source link

Question: is there a way to set up automatic performance monitoring transaction for each incoming request for Tonic #636

Open spencerbart opened 7 months ago

spencerbart commented 7 months ago

Is there a way to do something similar like this

let layer = tower::ServiceBuilder::new()
    .layer(sentry_tower::NewSentryLayer::<Request>::new_from_top())
    .layer(sentry_tower::SentryHttpLayer::with_transaction());

but for Tonic?

The description for sentry_tower::SentryHttpLayer::with_transaction() is Creates a new Layer which starts a new performance monitoring transaction for each incoming request. It would be nice to have this for tonic instead of going down the tracing route.

Swatinem commented 7 months ago

I think it should be fairly straight forward to copy the existing code for SentryHttpLayer over and adapt it to work with tonic. Apart from starting a transaction, the HttpLayer also does things like add the incoming request details to the scope, and use the incoming headers trace-id to start the transaction.

spencerbart commented 7 months ago

I am attempting to meld this https://github.com/getsentry/sentry-rust/blob/master/sentry-tower/src/http.rs and this https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs together but it's not working. Not sure why.

use std::convert::TryInto;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use http::{header, uri, StatusCode};
use hyper::{Body, Request, Response};
use sentry::protocol;
use tonic::body::BoxBody;
use tower_layer::Layer;
use tower_service::Service;

/// Tower Layer that logs Http Request Headers.
///
/// The Service created by this Layer can also optionally start a new
/// performance monitoring transaction for each incoming request,
/// continuing the trace based on incoming distributed tracing headers.
///
/// The created transaction will automatically use the request URI as its name.
/// This is sometimes not desirable in case the request URI contains unique IDs
/// or similar. In this case, users should manually override the transaction name
/// in the request handler using the [`Scope::set_transaction`](sentry::Scope::set_transaction)
/// method.
#[derive(Clone, Default)]
pub struct SentryGrpcLayer {
    start_transaction: bool,
}

impl SentryGrpcLayer {
    /// Creates a new Layer that only logs Request Headers.
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates a new Layer which starts a new performance monitoring transaction
    /// for each incoming request.
    pub fn with_transaction() -> Self {
        Self {
            start_transaction: true,
        }
    }
}

/// Tower Service that logs Http Request Headers.
///
/// The Service can also optionally start a new performance monitoring transaction
/// for each incoming request, continuing the trace based on incoming
/// distributed tracing headers.
#[derive(Clone)]
pub struct SentryGrpcService<S> {
    service: S,
    start_transaction: bool,
}

impl<S> Layer<S> for SentryGrpcLayer {
    type Service = SentryGrpcService<S>;

    fn layer(&self, service: S) -> Self::Service {
        Self::Service {
            service,
            start_transaction: self.start_transaction,
        }
    }
}

/// The Future returned from [`SentryHttpService`].
#[pin_project::pin_project]
pub struct SentryGrpcFuture<F> {
    on_first_poll: Option<(
        sentry::protocol::Request,
        Option<sentry::TransactionContext>,
    )>,
    transaction: Option<(sentry::TransactionOrSpan, Option<sentry::TransactionOrSpan>)>,
    #[pin]
    future: F,
}

impl<F, BoxBody, Error> Future for SentryGrpcFuture<F>
where
    F: Future<Output = Result<Response<BoxBody>, Error>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("SentryGrpcFuture::poll");
        let slf = self.project();
        if let Some((sentry_req, trx_ctx)) = slf.on_first_poll.take() {
            sentry::configure_scope(|scope| {
                if let Some(trx_ctx) = trx_ctx {
                    let transaction: sentry::TransactionOrSpan =
                        sentry::start_transaction(trx_ctx).into();
                    transaction.set_request(sentry_req.clone());
                    let parent_span = scope.get_span();
                    scope.set_span(Some(transaction.clone()));
                    *slf.transaction = Some((transaction, parent_span));
                }

                scope.add_event_processor(move |mut event| {
                    if event.request.is_none() {
                        event.request = Some(sentry_req.clone());
                    }
                    Some(event)
                });
            });
        }
        match slf.future.poll(cx) {
            Poll::Ready(res) => {
                if let Some((transaction, parent_span)) = slf.transaction.take() {
                    if transaction.get_status().is_none() {
                        let status = match &res {
                            Ok(res) => map_status(res.status()),
                            Err(_) => protocol::SpanStatus::UnknownError,
                        };
                        transaction.set_status(status);
                    }
                    transaction.finish();
                    sentry::configure_scope(|scope| scope.set_span(parent_span));
                }
                Poll::Ready(res)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<S> Service<Request<Body>> for SentryGrpcService<S>
where
    S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = SentryGrpcFuture<S::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request<Body>) -> Self::Future {
        println!("SentryGrpcService::call");
        let sentry_req = sentry::protocol::Request {
            method: None,
            url: get_url_from_request(&request),
            headers: request
                .headers()
                .into_iter()
                .map(|(header, value)| {
                    (
                        header.to_string(),
                        value.to_str().unwrap_or_default().into(),
                    )
                })
                .collect(),
            ..Default::default()
        };
        let trx_ctx = if self.start_transaction {
            let headers = request.headers().into_iter().flat_map(|(header, value)| {
                value.to_str().ok().map(|value| (header.as_str(), value))
            });

            let tx_name = format!("{}", path_from_request(&request));
            Some(sentry::TransactionContext::continue_from_headers(
                &tx_name,
                "http.server",
                headers,
            ))
        } else {
            None
        };

        SentryGrpcFuture {
            on_first_poll: Some((sentry_req, trx_ctx)),
            transaction: None,
            future: self.service.call(request),
        }
    }
}

fn path_from_request<B>(request: &Request<B>) -> &str {
    request.uri().path()
}

fn map_status(status: StatusCode) -> protocol::SpanStatus {
    match status {
        StatusCode::UNAUTHORIZED => protocol::SpanStatus::Unauthenticated,
        StatusCode::FORBIDDEN => protocol::SpanStatus::PermissionDenied,
        StatusCode::NOT_FOUND => protocol::SpanStatus::NotFound,
        StatusCode::TOO_MANY_REQUESTS => protocol::SpanStatus::ResourceExhausted,
        status if status.is_client_error() => protocol::SpanStatus::InvalidArgument,
        StatusCode::NOT_IMPLEMENTED => protocol::SpanStatus::Unimplemented,
        StatusCode::SERVICE_UNAVAILABLE => protocol::SpanStatus::Unavailable,
        status if status.is_server_error() => protocol::SpanStatus::InternalError,
        StatusCode::CONFLICT => protocol::SpanStatus::AlreadyExists,
        status if status.is_success() => protocol::SpanStatus::Ok,
        _ => protocol::SpanStatus::UnknownError,
    }
}

fn get_url_from_request<B>(request: &Request<B>) -> Option<url::Url> {
    let uri = request.uri().clone();
    let mut uri_parts = uri.into_parts();
    uri_parts.scheme.get_or_insert(uri::Scheme::HTTP);
    if uri_parts.authority.is_none() {
        let host = request.headers().get(header::HOST)?.as_bytes();
        uri_parts.authority = Some(host.try_into().ok()?);
    }
    let uri = uri::Uri::from_parts(uri_parts).ok()?;
    uri.to_string().parse().ok()
}
Swatinem commented 7 months ago
impl<S> Service<Request<Body>> for SentryGrpcService<S>

I’m not quite sure what kind of request/body type you need for grpc, but I doubt that it is hyper::Request<hyper::Body>?

spencerbart commented 7 months ago

That came from the example given in the Tonic repo

https://github.com/hyperium/tonic/blob/177c1f3d7407c60e5fb782a23fee935e7e3fa12d/examples/src/tower/server.rs#L89