hyperium / tonic

A native gRPC client & server implementation with async/await support.
https://docs.rs/tonic
MIT License
9.76k stars 997 forks source link

Ability to send a `END_STREAM` frame in order to perform half-close on gRPC APIs. #1066

Closed jayy-lmao closed 2 years ago

jayy-lmao commented 2 years ago

Feature Request

Crates

tonic

Motivation

First I want to say that this is a good problem to have, as tonic has made the experience of using gRPC in Rust so smooth. Library has felt great to read through in understanding this problem :)

I'm attempting to use an API which requires a half-close signal to finalize bi-directional streaming. In gRPC over HTTP2 spec this is done by a flag END_STREAM set on the request's data frame. I noticed that hyperium/h2 implements a way to set this, though in tonic I think it might be hard-coded to false. The place I think this is occurring is the is_end_stream function within EncodeBody<S> within src/codec/encode.rs.

In previous discussion of this (https://github.com/hyperium/tonic/issues/659) it was recommended to drop the sender. Doing this does not send a stream-end instruction to the API though, and this results in timeout errors. This is discussed in this stack-overflow thread (https://stackoverflow.com/questions/67610502/how-do-i-perform-a-half-close-on-a-grpc-bidirectional-stream-using-tonic/73456870#73456870) and I have been able to reproduce it.

The need for this with the Google API I am using, is that a stream-end instruction indicates that everything has been successfully sent. This is needed for triggering the next actions by that API, so we must continue to listen on the gRPC receiving stream for the final response.

Proposal

~~I planned to open a PR so I investigated the code deeply. EmptyBody has a END_STREAM flag set as true, as far as I can understand so I thought there might be a way to send an empty body through. The issue I'm also having is that the Item on the stream is just a Result<Bytes, Status> which doesn't add much room to add a boolean field for ending streams. Thought maybe there could be some behavior that if the Bytes array was empty it would send an empty body / end but that just feels like obfuscated behavior; in any case the place I was trying to figure out how to add it was pub async fn streaming<S, M1, M2, C> in tonic's src/client/grpc.rs.~~

This proposal may not sound particularly helpful, but I think I may need input from a contributor / maintainer on what way would be the most consistent with this codebase and in line with its design.

Updated: It looks like a close method could be generated for gRPC clients which would just send an http-body's EmptyBody?

   // tonic/src/client/grpc.rs > impl Grpc<T> 

    /// Close connection
    pub async fn close<M2, C, D>(
        &mut self,
        path: PathAndQuery,
        codec: C,
    ) -> Result<Response<M2>, Status>
    where
        T: GrpcService<BoxBody>,
        T::ResponseBody: Body + Send + 'static,
        <T::ResponseBody as Body>::Error: Into<crate::Error>,
        C: Codec<Encode = http_body::Empty<D>, Decode = M2>,
        M2: Send + Sync + 'static,
        D: Send + Sync + 'static,
    {
        let empty_body = Empty::new();
        let request = Request::new(empty_body).map(|m| stream::once(future::ready(m)));

        self.client_streaming(request, path, codec).await
    }

(This looks like I could be very wrong and might be in for a world of hurt with the traitMessageis not implemented forhttpbody::Empty<>` in future)

Still figuring out where it would sit in tonic-build/src/lib.rs though in order to be available for generated clients

                /// Closes the connection
                ///
                pub fn close(mut self) -> Self {
                    self.inner.ready().await.map_err(|e| {
                        tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into()))
                    })?;
                    let path = http::uri::PathAndQuery::from_static(#path);
                    self.inner.close(path, codec).await

If I attempt to put this in fn generate I can't determine what codec to pass into self.inner.close(...) as most get their codec from the rpc method they are implementing.

Thank you

[Update: I made this mistake of looking at the release being used by another library, looking at master I see a bunch of the logic has changed around how is_end_stream is determined; though it still does not yet seem possible to end the stream through a command]

LucioFranco commented 2 years ago

I believe this may be more related to hyper and its handling of dropping the handles and returning specific h2 things.

jayy-lmao commented 2 years ago

Hi Lucio!

Sorry about the issue churn, I discovered that bumping the tonic version and correctly dropping the sender did indeed lead to a 'END_STREAM' being sent.

A great relief and the library using tonic is going great for us now.

Thank you for the streaming examples in the examples folder, they were extremely useful in triaging this.