cloudflare / pingora

A library for building fast, reliable and evolvable network services.
Apache License 2.0
21.59k stars 1.2k forks source link

WriteError occurs when proxying grpc stream #404

Closed huahuadeliaoliao closed 2 days ago

huahuadeliaoliao commented 1 week ago

code:

use async_trait::async_trait;
use pingora::prelude::{http_proxy_service, ProxyHttp, Session};
use pingora::{listeners::TlsSettings, prelude::HttpPeer, protocols::ALPN, server::Server, Error};
use url::Url;

fn main() {
    env_logger::init();

    let mut server = Server::new(None).unwrap();
    server.bootstrap();

    let multi_route_proxy = MultiRouteProxy::new();

    let mut proxy_service = http_proxy_service(&server.configuration, multi_route_proxy);

    let mut tls_settings = TlsSettings::intermediate("fullchain.pem", "privkey.pem").unwrap();
    tls_settings.set_alpn(ALPN::H2H1);
    proxy_service.add_tls_with_settings("0.0.0.0:443", None, tls_settings);

    server.add_service(proxy_service);

    server.run_forever();
}

struct MultiRouteProxy {
    grpc_upstream: String,
    grafana_upstream: String,
    prometheus_upstream: String,
    api_upstream: String,
    www_upstream: String,
}

impl MultiRouteProxy {
    fn new() -> Self {
        Self {
            grpc_upstream: "127.0.0.1:4317".to_string(),
            grafana_upstream: "127.0.0.1:3000".to_string(),
            prometheus_upstream: "127.0.0.1:9090".to_string(),
            api_upstream: "127.0.0.1:7000".to_string(),
            www_upstream: "127.0.0.1:4331".to_string(),
        }
    }

    fn select_upstream(&self, host: &str) -> Result<(String, ALPN), Box<Error>> {
        match host {
            h if h.starts_with("otel-col") => Ok((self.grpc_upstream.clone(), ALPN::H2)),
            h if h.starts_with("grafana") => Ok((self.grafana_upstream.clone(), ALPN::H1)),
            h if h.starts_with("prometheus") => Ok((self.prometheus_upstream.clone(), ALPN::H1)),
            h if h.starts_with("api") => Ok((self.api_upstream.clone(), ALPN::H2)),
            h if h.starts_with("www") => Ok((self.www_upstream.clone(), ALPN::H2)),
            _ => Err(Box::new(*pingora::Error::new(
                pingora::ErrorType::HTTPStatus(404),
            ))),
        }
    }

    fn select_sni(&self, host: &str) -> &str {
        match host {
            h if h.starts_with("otel-col") => "otel-col.example.com",
            h if h.starts_with("grafana") => "grafana.example.com",
            h if h.starts_with("prometheus") => "prometheus.example.com",
            h if h.starts_with("api") => "api.example.com",
            h if h.starts_with("www") => "www.example.com",
            _ => "",
        }
    }
}

#[async_trait]
impl ProxyHttp for MultiRouteProxy {
    type CTX = ();

    fn new_ctx(&self) -> Self::CTX {}

    async fn upstream_peer(
        &self,
        session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>, Box<pingora::Error>> {
        let req = session.req_header();

        let host = if req.uri.host().is_some() {
            Url::parse(&req.uri.to_string())
                .ok()
                .and_then(|url| url.host_str().map(|s| s.to_string()))
                .unwrap_or_default()
        } else {
            req.headers
                .get("host")
                .and_then(|h| h.to_str().ok())
                .unwrap_or_default()
                .to_string()
        };

        let (selected_upstream, alpn) = self.select_upstream(&host)?;
        let sni = self.select_sni(&host);

        let mut peer = HttpPeer::new(&selected_upstream, false, sni.to_string());
        peer.options.alpn = alpn;

        Ok(Box::new(peer))
    }
}

error:

[2024-09-29T06:59:57Z ERROR pingora_proxy] Fail to proxy: Upstream WriteError context: Peer: addr: 127.0.0.1:4317, scheme: HTTP,sni: otel-col.example.com, cause: context: while writing h2 request body cause: user error: inactive stream, status: 502, tries: 1, retry: false, POST https://otel-col.example.com/opentelemetry.proto.collector.metrics.v1.MetricsService/Export, Host:

huahuadeliaoliao commented 1 week ago

I only saw this error in the pingora log. There seems to be no data loss problem on my grpc sender and receiver.

huahuadeliaoliao commented 2 days ago

I've found the cause of this problem, it has something to do with my upstream server behavior, I thought I should take a look at the documentation for opentelemetry