containerd / ttrpc-rust

Rust implementation of ttrpc (GRPC for low-memory environments)
Apache License 2.0
197 stars 47 forks source link

async: Close listener fd after server shutdown #200

Closed abel-von closed 1 year ago

abel-von commented 1 year ago

the listener fd is not closed after the server shutdown, this will make a residual socket fd in the process if it is running different servers again and again.

here I changed the example codes to reproduce it:

// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

#[macro_use]
extern crate log;

use std::sync::Arc;

#[cfg(unix)]
use async_trait::async_trait;
use log::LevelFilter;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

mod protocols;
mod utils;

struct HealthService;

#[cfg(unix)]
#[async_trait]
impl health_ttrpc::Health for HealthService {
    async fn check(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        _req: health::CheckRequest,
    ) -> Result<health::HealthCheckResponse> {
        let mut status = Status::new();

        status.set_code(Code::NOT_FOUND);
        status.set_message("Just for fun".to_string());

        sleep(std::time::Duration::from_secs(10)).await;

        Err(Error::RpcStatus(status))
    }

    async fn version(
        &self,
        ctx: &::ttrpc::r#async::TtrpcContext,
        req: health::CheckRequest,
    ) -> Result<health::VersionCheckResponse> {
        info!("version {:?}", req);
        info!("ctx {:?}", ctx);
        let mut rep = health::VersionCheckResponse::new();
        rep.agent_version = "mock.0.1".to_string();
        rep.grpc_version = "0.0.1".to_string();
        let mut status = Status::new();
        status.set_code(Code::NOT_FOUND);
        Ok(rep)
    }
}

struct AgentService;

#[cfg(unix)]
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
    async fn list_interfaces(
        &self,
        _ctx: &::ttrpc::r#async::TtrpcContext,
        _req: agent::ListInterfacesRequest,
    ) -> ::ttrpc::Result<agent::Interfaces> {
        let mut rp = Vec::new();

        let mut i = types::Interface::new();
        i.name = "first".to_string();
        rp.push(i);
        let mut i = types::Interface::new();
        i.name = "second".to_string();
        rp.push(i);

        let mut i = agent::Interfaces::new();
        i.Interfaces = rp;

        Ok(i)
    }
}

#[cfg(windows)]
fn main() {
    println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);

    let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
    let h = Arc::new(h);
    let hservice = health_ttrpc::create_health(h);

    let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
    let a = Arc::new(a);
    let aservice = agent_ttrpc::create_agent_service(a);

    utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();

    let mut server = Server::new()
        .bind(utils::SOCK_ADDR)
        .unwrap()
        .register_service(hservice)
        .register_service(aservice);

    let mut hangup = signal(SignalKind::hangup()).unwrap();
    let mut interrupt = signal(SignalKind::interrupt()).unwrap();
    server.start().await.unwrap();
    let mut times = 0;
    loop {
        tokio::select! {
            _ = hangup.recv() => {
                // test stop_listen -> start
                println!("shutdown");
                server.shutdown().await;

                let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
                let h = Arc::new(h);
                let hservice = health_ttrpc::create_health(h);

                let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
                let a = Arc::new(a);
                let aservice = agent_ttrpc::create_agent_service(a);
                let address = format!("{}-{}", utils::SOCK_ADDR, times);
                times += 1;
                utils::remove_if_sock_exist(&address).unwrap();
                server = Server::new()
                    .bind(&address)
                    .unwrap()
                    .register_service(hservice)
                    .register_service(aservice);
                println!("start listen");
                server.start().await.unwrap();

                // // hold some time for the new test connection.
                // sleep(std::time::Duration::from_secs(100)).await;
            }
            _ = interrupt.recv() => {
                // test graceful shutdown
                println!("graceful shutdown");
                server.shutdown().await.unwrap();
            }
    }
        ;
    }
}

if I kill -SIGHUP of this process, we can see that the fd count is increasing: image

this PR fix the issue by close the listener fd. this is ok for those calling server.bind() with the socket address, but if the listener is opened from outside, and the lifecycle is managed outside, then it may cause the twice close error. but we can see that the listener is closed in the sync codes. so I think we can assume that the server has taken the ownership of listener fd after it is added into the server.

Tim-Zhang commented 1 year ago

@abel-von Good catch, this bug is caused by https://github.com/containerd/ttrpc-rust/blob/b13d3fd5423275766e9fd1d929371bd223bedf0f/src/asynchronous/server.rs#L216 and I forgot to close the fd. Would you mind back porting this to 0.5.0, 0.6.0 and 0.7.0? if you are too busy to do it, never mind, I will do it. Thanks a million.

abel-von commented 1 year ago

@Tim-Zhang The CI seems to have some wired error, could you help solving it please? I can cherry-pick this to other branches, shall I submit the PR too?

codecov[bot] commented 1 year ago

Codecov Report

Patch coverage has no change and project coverage change: -0.05 :warning:

Comparison is base (e94bb9f) 24.47% compared to head (cc6b187) 24.42%.

Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #200 +/- ## ========================================== - Coverage 24.47% 24.42% -0.05% ========================================== Files 17 17 Lines 2521 2526 +5 ========================================== Hits 617 617 - Misses 1904 1909 +5 ``` | [Impacted Files](https://app.codecov.io/gh/containerd/ttrpc-rust/pull/200?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=containerd) | Coverage Δ | | |---|---|---| | [src/asynchronous/server.rs](https://app.codecov.io/gh/containerd/ttrpc-rust/pull/200?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=containerd#diff-c3JjL2FzeW5jaHJvbm91cy9zZXJ2ZXIucnM=) | `0.00% <0.00%> (ø)` | |

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

Tim-Zhang commented 1 year ago

@Tim-Zhang The CI seems to have some wired error, could you help solving it please? I can cherry-pick this to other branches, shall I submit the PR too?

It works now.