containerd / ttrpc-rust

Rust implementation of ttrpc (GRPC for low-memory environments)
Apache License 2.0
197 stars 47 forks source link

The ttrpc client lifetime is so long for Main thread #225

Closed jokemanfire closed 6 months ago

jokemanfire commented 6 months ago

Description of problem

I code example client.rs like this

fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);

    for i in 0..10 {
        let c = Client::connect(utils::SOCK_ADDR).unwrap();
        let hc = health_ttrpc::HealthClient::new(c.clone());
        let ac = agent_ttrpc::AgentServiceClient::new(c);

        let thc = hc.clone();
        let tac = ac.clone();

        let now = std::time::Instant::now();
        let t = thread::spawn(move || {
            let req = health::CheckRequest::new();
            println!(
                "OS Thread {:?} - health.check() started: {:?}",
                std::thread::current().id(),
                now.elapsed(),
            );

            let rsp = thc.check(default_ctx(), &req);
            match rsp.as_ref() {
                Err(Error::RpcStatus(s)) => {
                    assert_eq!(Code::NOT_FOUND, s.code());
                    assert_eq!("Just for fun".to_string(), s.message())
                }
                Err(e) => {
                    panic!("not expecting an error from the example server: {:?}", e)
                }
                Ok(x) => {
                    panic!("not expecting a OK response from the example server: {:?}", x)
                }
            }
            println!(
                "OS Thread {:?} - health.check() -> {:?} ended: {:?}",
                std::thread::current().id(),
                rsp,
                now.elapsed(),
            );
        });

        let t2 = thread::spawn(move || {
            println!(
                "OS Thread {:?} - agent.list_interfaces() started: {:?}",
                std::thread::current().id(),
                now.elapsed(),
            );

            let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
                Err(e) => {
                    panic!("not expecting an error from the example server: {:?}", e)
                }
                Ok(s) => {
                    assert_eq!("first".to_string(), s.Interfaces[0].name);
                    assert_eq!("second".to_string(), s.Interfaces[1].name);
                    format!("{s:?}")
                }
            };

            println!(
                "OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
                std::thread::current().id(),
                show,
                now.elapsed(),
            );
        });

    println!(
        "Main OS Thread - agent.online_cpu_mem() started: {:?}",
        now.elapsed()
    );
    let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
        Err(Error::RpcStatus(s)) => {
            assert_eq!(Code::NOT_FOUND, s.code());
            assert_eq!(
                "/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
                s.message()
            );
            format!("{s:?}")
        }
        Err(e) => {
            panic!("not expecting an error from the example server: {:?}", e)
        }
        Ok(s) => {
            panic!("not expecting a OK response from the example server: {:?}", s)
        }
    };
    println!(
        "Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
        show,
        now.elapsed()
    );

        println!("\nsleep 2 seconds ...\n");
        thread::sleep(std::time::Duration::from_secs(2));

        let version = hc.version(default_ctx(), &health::CheckRequest::new());
        assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
        assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
        println!(
            "Main OS Thread - health.version() started: {:?}",
            now.elapsed()
        );
        println!(
            "Main OS Thread - health.version() -> {:?} ended: {:?}",
            version,
            now.elapsed()
        );
        t.join().unwrap();
        t2.join().unwrap();

    }
    loop{

    }

}

this this the phenomenon, the fd will infinite growth.

(replace this text with the list of steps you followed)

Expected result

method 1 1、Provide a interface that I can proactively release the fd. 2、If it exceeds the lifecycle ,release the fd automatic.

Actual result

1、fd will not increase in main thread.

(replace this text with details of what actually happened) If it is true, I will submit a PR.

lifupan commented 6 months ago

Hi @jokemanfire

Why do you think the ttrpc client will live infinitely? Once the t and t2 thread were joined, then hc, thc, ac and tac would be destroyed, right?

jokemanfire commented 6 months ago

Hi @jokemanfire

Why do you think the ttrpc client will live infinitely? Once the t and t2 thread were joined, then hc, thc, ac and tac would be destroyed, right?

Hello , when I try to code 'like loop (create a client)' in the main thread (I need to create client_ttprc in my project , all my project function is in main thread), I found the /proc/{self.pid}/fd will increase infinitely.The file descriptor of the process is restricted to go online. I found the fd will not be release in main_thread , though I use client_ttprc in a wrap function, Is it the usage error caused by my insufficient understanding? thank you.

lifupan commented 6 months ago

let c = Client::connect(utils::SOCK_ADDR).unwrap();

The key is the client you create with " let c = Client::connect(utils::SOCK_ADDR).unwrap();" , once your client is over, you should make sure the "c" and any other clone entity should be destroyed.

jokemanfire commented 6 months ago

let c = Client::connect(utils::SOCK_ADDR).unwrap();

The key is the client you create with " let c = Client::connect(utils::SOCK_ADDR).unwrap();" , once your client is over, you should make sure the "c" and any other clone entity should be destroyed.

thank you ,but I feel confused , In the example code , please check If my error: 1 、 for i in 0..10 in the next iter ,the lifetime in this iter have been finished, 2、 ` let c = Client::connect(utils::SOCK_ADDR).unwrap(); let hc = health_ttrpc::HealthClient::new(c.clone()); let ac = agent_ttrpc::AgentServiceClient::new(c.clone());

    let thc = hc.clone();
    let tac = ac.clone();`   the thc and tac 's life time will finished until the thread join completely.

3、the hc 、ac 's life time should finished after this iter . Is there some desotry method for the hc 、ac variable? Thank you, big shot

jokemanfire commented 6 months ago

If the Client Inner _connection: Arc, this variable Arc count will not be zero cause this problem? the thread will hold it forever in Func new_client.

lifupan commented 6 months ago

Since the hc and ac's life time ended, then it should close the connection automatically.

If the Client Inner _connection: Arc, this variable Arc count will not be zero cause this problem? the thread will hold it forever in Func new_client.

That would be, but in your example, after the loop, all of the client and clone entity had been dropped and it should be reduced to zero? Anyway, would you check was there any thread created by client new exit after all of the client dropped?

jokemanfire commented 6 months ago

thanks , now I get this problem clearly , first change the thread's client_connection to weak Arc , this will not increase the Arc count, And then when use libc poll ,there must a timeout parameters , otherwise it will wait forever... I will push a pr ,and then you can check it. there is my test code I have change two point to resolve this problem , 1、change it to weak_arc in the thread. 2、add timeout to libc.poll

fn get_describe_numbers(pid: &str) -> i64{
    let proc_dir = format!("/proc/{}/fd", pid);
    let mut cnt = 0;
    match read_dir(PathBuf::from(proc_dir)) {
        Ok(entries) => {
            for entry in entries {
                match entry {
                    Ok(e) => {
                        cnt += 1;
                    },
                    Err(e) => {
                        eprintln!("Error reading directory entry: {}", e);
                    }
                }
            }
        },
        Err(e) => {
            eprintln!("Error reading directory: {}", e);
        }
    }
    cnt
}
fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);
    let pid = process::id().to_string();
    let  pre_fds = get_describe_numbers(pid.as_str());
    for i in 0..1000 {
        let now_fds = get_describe_numbers(pid.as_str());
        if now_fds!= pre_fds{
            println!("fd is not release pre_fd {pre_fds:?}  now_fd {now_fds:?}");
            // pre_fds = now_fds
        }
        println!("-------------now fd number is {now_fds:?}---------------------------------------");
        let c = Client::connect(utils::SOCK_ADDR).unwrap();

    }
    let now_fds = get_describe_numbers(pid.as_str());
    if now_fds!= pre_fds{
        println!("fd is not release pre_fd {pre_fds:?}  now_fd {now_fds:?}");
        // pre_fds = now_fds
    }

    println!("over");

}
jokemanfire commented 6 months ago

226

wllenyj commented 6 months ago

226

Fixed