little-dude / netlink

netlink libraries for rust
Other
328 stars 89 forks source link

Connection awaiting to cleanup all used resources #292

Open jimmyvanhest opened 2 years ago

jimmyvanhest commented 2 years ago

When playing around with this library I noticed some behavior I was not expecting.

When the handle and messages returned by rtnetlink::new_connection are dropped, I would expect that an await on the connection returned by that same call would resolve.

But what actually happens is that the await on the connection blocks till the next time it receives IO from netlink and then completes because only then it notices that there is no more interest in it.

I also noticed that the order in which handle and messages are dropped/closed doesn't mater.

see below for a minimum example:

use futures::{stream::StreamExt, TryStreamExt, channel::mpsc::UnboundedReceiver};
use log::info;
use rtnetlink::{new_connection, packet::*, sys::{AsyncSocket, SocketAddr}, Handle};
use std::io::Result;

async fn get_inital(handle: Handle) {
    // get all address info
    let mut addresses = handle.address().get().execute();
    while let Ok(Some(address)) = addresses.try_next().await {
        info!("received address message: {:?}", address);
    }
}

async fn handle_messages(mut messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>) {

    // do something with messages till we are not interested anymore
    info!("begin waiting on message receiver");
    while let Some(message) = tokio::select! {
        _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => None,
        message = messages.next() => message
    } {
        info!("received message: {:?}", message);
    }
    info!("no activity for 5 seconds shuting down");

}

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    // Create a rtnetlink connection.
    let (mut conn, handle, messages) = new_connection()?;

    // add membership for address events.
    let groups = vec![RTNLGRP_IPV4_IFADDR, RTNLGRP_IPV6_IFADDR];
    for group in groups {
        conn.socket_mut().socket_mut().add_membership(group)?;
    }

    // spawn the rtnetlink connection to drive its io and stash the the join handle to await later.
    let driver = tokio::spawn(conn);

    // handle was moved and thus dropped
    get_inital(handle).await;

    // messages was moved and thus dropped
    handle_messages(messages).await;

    // handle and messages were dropped so driver should complete.
    info!("begin waiting on rtnetlink driver");
    driver.await?;
    info!("done waiting on rtnetlink driver");

    Ok(())
}

is it possible that this behavior can be changed so its used resources are released back to the system as soon as possible?