hyperium / tonic

A native gRPC client & server implementation with async/await support.
https://docs.rs/tonic
MIT License
9.77k stars 997 forks source link

Support hyper 1.0 #1307

Open howardjohn opened 1 year ago

howardjohn commented 1 year ago

Hyper currently has a 1.0-rc3 build out and 1.0.0 is coming later on (https://hyper.rs/contrib/roadmap/). Given tonic depends on hyper, it would be desirable to have tonic support hyper 1.0. I didn't see any issue tracking this so I thought I'd start one.

davidpdrsn commented 1 year ago

If you need hyper 1.0 support you should be able to use https://crates.io/crates/tower-hyper-http-body-compat

howardjohn commented 1 year ago

Are there any examples with tonic? I've spent about 4 hours on trying to wrangle this and struggling a bit.

My current calling code:

        let svc = tls::grpc_connector(address, root_cert)?;
        let svc = tower_hyper_http_body_compat::Hyper1HttpServiceAsTowerService03HttpService::new(svc);
        let client = GrpcServiceClient::with_interceptor(svc, auth);

Fails with

40 |         let client = IstioCertificateServiceClient::with_interceptor(svc, auth);
   |                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `hyper::service::Service<hyper::Request<HttpBody04ToHttpBody1<http_body::combinators::box_body::UnsyncBoxBody<bytes::Bytes, tonic::Status>>>>` is not implemented for `boring::TlsGrpcChannel`
   |
   = help: the trait `hyper::service::Service<hyper::Request<UnsyncBoxBody<bytes::Bytes, tonic::Status>>>` is implemented for `boring::TlsGrpcChannel`

While my Service (hyper 1.0 service) is:

type BoxBody1 = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, tonic::Status>;

impl hyper::service::Service<hyper::http::Request<BoxBody1>> for TlsGrpcChannel {
    type Response  = hyper::http::Response<hyper::body::Incoming>;
    type Error = hyper_util::client::legacy::Error;
    type Future = ResponseFuture;

    fn call(&mut self, mut req: Request<BoxBody1>) -> Self::Future {
        let uri = Uri::builder()
            .scheme(self.uri.scheme().unwrap().to_owned())
            .authority(self.uri.authority().unwrap().to_owned())
            .path_and_query(req.uri().path_and_query().unwrap().to_owned())
            .build()
            .unwrap();
        *req.uri_mut() = uri;
        self.client.request(req)
    }
}

Struggling with bridging the body type I think

howardjohn commented 1 year ago

I did get it working for gRPC services, just not for gRPC clients. Here is the full code if it helps:

https://github.com/howardjohn/ztunnel/blob/e7ca3033da71574d668f699fba824685dc7115c8/src/identity/caclient.rs#L46-L50 https://github.com/howardjohn/ztunnel/blob/e7ca3033da71574d668f699fba824685dc7115c8/src/tls/boring.rs#L401-L416

davidpdrsn commented 1 year ago

I'm not sure. Haven't actually done it myself.

I tried getting a small example working but kept running into connection issues:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use http::{Request, Response};
use hyper::{
    body::Incoming,
    client::conn::{self, http2::SendRequest},
};
use proto::GreetRequest;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tower::{Service, ServiceExt};
use tower_hyper_http_body_compat::{HttpBody04ToHttpBody1, HttpBody1ToHttpBody04};

use crate::proto::{
    greeter_server::{Greeter, GreeterServer},
    GreetReply,
};

pub mod proto {
    tonic::include_proto!("greeter");
}

#[tokio::main]
async fn main() {
    // run a tonic server
    let listener = TcpListener::bind("0.0.0.0:3030").await.unwrap();
    println!("server listening on {}", listener.local_addr().unwrap());
    tokio::spawn(async move {
        let greeter = MyGreeter::default();
        tonic::transport::Server::builder()
            .add_service(GreeterServer::new(greeter))
            .serve_with_incoming(TcpListenerStream::new(listener))
            .await
            .unwrap();
    });

    let mut connector = hyper_util::client::connect::HttpConnector::new();
    let target_stream = connector
        .ready()
        .await
        .unwrap()
        .call(http::Uri::from_static("http://0.0.0.0:3030"))
        .await
        .unwrap();

    let (request_sender, connection) = conn::http2::handshake(TokioExec, target_stream)
        .await
        .unwrap();

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Error in connection: {}", e);
        }
    });

    let svc = SendRequestService {
        sender: request_sender,
    };

    let mut client = proto::greeter_client::GreeterClient::new(svc);
    let res = client
        .greet(GreetRequest {
            name: "Alice".to_owned(),
        })
        .await
        .unwrap();
    dbg!(&res);
}

#[derive(Default)]
pub struct MyGreeter {}

#[tonic::async_trait]
impl Greeter for MyGreeter {
    async fn greet(
        &self,
        req: tonic::Request<GreetRequest>,
    ) -> Result<tonic::Response<GreetReply>, tonic::Status> {
        let GreetRequest { name } = req.into_inner();
        Ok(tonic::Response::new(GreetReply {
            greeting: format!("Hi {name}"),
        }))
    }
}

#[derive(Copy, Clone)]
struct TokioExec;

impl<Fut> hyper::rt::Executor<Fut> for TokioExec
where
    Fut: Future + Send + 'static,
    Fut::Output: Send,
{
    fn execute(&self, fut: Fut) {
        tokio::spawn(fut);
    }
}

struct SendRequestService<B> {
    sender: SendRequest<B>,
}

impl<B> tower::Service<Request<B>> for SendRequestService<HttpBody04ToHttpBody1<B>>
where
    B: http_body::Body + Send + 'static,
    HttpBody04ToHttpBody1<B>: hyper::body::Body,
{
    type Response = Response<HttpBody1ToHttpBody04<Incoming>>;
    type Error = hyper::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.sender.poll_ready(cx)
    }

    fn call(&mut self, req: Request<B>) -> Self::Future {
        let req = req.map(HttpBody04ToHttpBody1::new);

        let future = self.sender.send_request(req);
        Box::pin(async move {
            let res = future.await?;
            Ok(res.map(HttpBody1ToHttpBody04::new))
        })
    }
}

This fails with

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Status { code: Unknown, message: "http2 error: user error: request URI missing scheme and authority", source: Some(hyper::Error(Http2, Error { kind: User(MissingUriSchemeAndAuthority) })) }', examples/tonic-example/src/main.rs:73:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Not sure why.

howardjohn commented 1 year ago

Nice! That got things working for me. The one thing I did have to do was stop using interceptor, which requires body to implement Default, and Incoming doesn't. I am not sure why it needs that. Will try to workaround that and can update with my full working code later for others to reference. I appreciate the help!

PhiDung-hub commented 10 months ago

Is there any timeline for when will hyper 1.0 be supported?

LucioFranco commented 10 months ago

I believe we will need to refactor the server to support hyper 1.0's new API. If someone is up for taking on this week feel free to post here and tag me in the PR. I currently don't have time on my plate to get this done.

LucioFranco commented 10 months ago

We will probably follow what axum does since we use their router pretty heavily.

rrichardson commented 5 months ago

We will probably follow what axum does since we use their router pretty heavily.

Axum is currently using Hyper 1.1 . It looks like this was the PR with the shift https://github.com/tokio-rs/axum/pull/1882

tommyvn commented 3 months ago

for anyone else landing here when looking for this error that @davidpdrsn encountered:

This fails with

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Status { code: Unknown, message: "http2 error: user error: request URI missing scheme and authority", source: Some(hyper::Error(Http2, Error { kind: User(MissingUriSchemeAndAuthority) })) }', examples/tonic-example/src/main.rs:73:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

it's likely because your request has a uri that doesn't contain a scheme. http2's rfc says:

All HTTP/2 requests MUST include exactly one valid value for the ":method", ":scheme", and ":path" pseudo-header fields, unless they are CONNECT requests

in my case i was re-using an http1 incoming request when calling an http2 endpoint, but this request had / as the uri so the scheme couldn't be extracted into my http2 request. maybe that code snippet referenced on this issue has a similar http1 request reuse bug that I myself had.

ajwerner commented 2 months ago

Is this now closed by https://github.com/hyperium/tonic/pull/1670?

fenollp commented 2 months ago

I believe this can be closed, yes: https://github.com/hyperium/tonic/blob/255a885d316392a0135fa810ecfd5ae87135a329/tonic/Cargo.toml#L82 https://github.com/hyperium/tonic/releases/tag/v0.12.0