hyperium / tonic

A native gRPC client & server implementation with async/await support.
https://docs.rs/tonic
MIT License
9.31k stars 951 forks source link

The connection is not reconnected after disconnection #1254

Open whiskeycola opened 1 year ago

whiskeycola commented 1 year ago

Bug Report

The connection does not know how to recover on its own in the event of a break. (Sometimes it can, most often the connection freezes after disconnecting)

Version 0.8.3

│   └── tonic v0.8.3
│   └── tonic-build v0.8.4
├── tonic v0.8.3 (*)
│   └── tonic v0.8.3 (*)
│   └── tonic-build v0.8.4 (*)

Platform

Linux Home-PC 5.15.0-58-generic #64-Ubuntu SMP Thu Jan 5 11:43:13 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Crates

Description

If the connection between the client and the server is lost. He can no longer restore it and the program that uses the client no longer works. For example, when you sleep for a long time or physically disconnect from the server. Further use of the grpc service is impossible, just a timeout error or a broken connection.

[2023-01-28T15:11:55Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(19), flags: (0x4: END_HEADERS) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19), flags: (0x1: END_STREAM) }
[2023-01-28T15:12:01Z DEBUG hyper::proto::h2::server] stream error: connection error: broken pipe
[2023-01-28T15:12:01Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(19), error_code: CANCEL }

or this

[2023-01-28T15:23:35Z DEBUG client] send: interval(40)
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(25), error_code: CANCEL }
[2023-01-28T15:23:35Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(27), flags: (0x4: END_HEADERS) }
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27) }
[2023-01-28T15:23:35Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27), flags: (0x1: END_STREAM) }
[2023-01-28T15:23:36Z ERROR client] status: Cancelled, message: "Timeout expired", details: [], metadata: MetadataMap { headers: {} }

I encountered this problem on the working program in the finished product. To study it I created a simple server-client on from the tonic examples And I get the same errors. I run the server on a remote machine, that it would be possible to physically break the connection between the client and the server.

Server

struct Service {}
#[tonic::async_trait]
impl test_grpc::say_server::Say for Service {
    async fn hello(&self, request: Request<RequestSay>) -> Result<Response<ResponseSay>, Status> {
        let r = request.into_inner().text;
        debug!("in request: {}", r);
        Ok(Response::new(ResponseSay {
            text: format!("hello {r}"),
        }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    env_logger::Builder::new()
        .filter_level(log::LevelFilter::from_str("debug").unwrap())
        .init();
    let s = Service {};
    let key = "secret token";
    let svc = test_grpc::say_server::SayServer::with_interceptor(
        s,
        move |req: Request<()>| -> Result<Request<()>, Status> {
            let token: MetadataValue<_> = key.parse().unwrap();
            match req.metadata().get("authorization") {
                Some(t) if token == t => Ok(req),
                _ => Err(Status::unauthenticated("No valid auth token")),
            }
        },
    );
    let addr = "0.0.0.0:8804".parse::<SocketAddr>().unwrap();
    Server::builder()
        .add_service(svc)
        .serve(addr)
        .await
        .unwrap();
    Ok(())
}

Client


async fn tester_client(sleep: Duration, uri: &str, key: &str) {
    let uri = uri.parse().unwrap();
    debug!("create connect");
    let chan = tonic::transport::Channel::builder(uri)
        .timeout(Duration::from_secs(20))
        .connect_timeout(Duration::from_secs(20))
        //.http2_keep_alive_interval(Duration::from_secs(5))
        //.keep_alive_while_idle(true)
        .connect_lazy();

    let key = key.parse::<tonic::metadata::MetadataValue<_>>().unwrap();
    let mut key = Some(key);
    let mut service = test_grpc::say_client::SayClient::with_interceptor(
        chan,
        move |mut req: tonic::Request<()>| {
            if let Some(secret) = &mut key {
                req.metadata_mut().insert("authorization", secret.clone());
            }
            Ok(req)
        },
    );
    loop {
        let send_text = format!("interval({})", sleep.as_secs_f32() / 60.0);
        debug!("send: {send_text}");
        let res = match service
            .hello(tonic::Request::new(test_grpc::RequestSay {
                text: send_text.clone(),
            }))
            .await
        {
            Ok(r) => r,
            Err(e) => {
                error!("{e:#}");
                continue;
            }
        };
        debug!("recv: {}", res.into_inner().text);
        time::sleep(sleep).await;
        println!();
    }
}

I have tried several settings. For example, if use .http2_keep_alive_interval(Duration::from_secs(5)) then the connection does not break during idle time. But if you physically break the connection, then it can no longer be restored (Sometimes the tonic reconnects itself, but most often the connection just hangs). Perhaps I need to specify some other settings so that a new connection is established when it breaks?

amrhassan commented 1 year ago

I've managed to get around the hanging connection issue by setting the net.ipv4.tcp_retries2 Linux parameter to a lower value. This makes undeliverable packets fail the established stream earlier.

clarkmcc commented 1 month ago

+1 for this. The grpc-go library NewClient does not perform any I/O. The connection is established lazily and reconnections are automatically handled. Something like this is extremely convenient to have implemented within the library, otherwise I have to re-implement that behavior everywhere where I'm using a Rust gRPC client.