compio-rs / compio

A thread-per-core Rust runtime with IOCP/io_uring/polling.
MIT License
420 stars 37 forks source link

Cannot close tcp stream from different task #288

Closed fafhrd91 closed 2 months ago

fafhrd91 commented 2 months ago

I have 2 tasks, one for read and another for write. If I try to close stream from write task it never completes. Do I need to do anything special?

here is example:


use compio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use compio::net::{TcpListener, TcpStream};
use compio::BufResult;

#[compio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

    let addr = listener.local_addr().unwrap();
    let client_fut = TcpStream::connect(&addr);
    let server_fut = listener.accept();

    let (mut client, (server, _)) = futures_util::try_join!(client_fut, server_fut).unwrap();
    compio::runtime::spawn(read(server)).detach();

    client.write_all("test").await.0.unwrap();
    let (_, buf) = client.read_to_end(Vec::with_capacity(1024)).await.unwrap();

    assert_eq!(buf, b"test");
}

async fn read(mut io: TcpStream) {
    loop {
        let buf = Vec::with_capacity(1024);
        let BufResult(_, buf) = io.read(buf).await;

        compio::runtime::spawn(write(io.clone(), buf)).detach();
    }
}

async fn write(mut io: TcpStream, buf: Vec<u8>) {
    println!("Writing {:?}", buf);
    io.write(buf).await.0.unwrap();

    // this line never completes
    io.close().await.unwrap();
}
Berrysoft commented 2 months ago

Yes. The cloned stream cannot be closed until the original one is dropped. It is just waiting for that. If you still want to use the stream, no need to close the cloned one.

fafhrd91 commented 2 months ago

thank. would be great to see this info in doc string

fafhrd91 commented 2 months ago

I slightly modified example, it still doesn't work even if I try to close original stream.

here is modification:

async fn read(mut io: TcpStream) {
    let buf = Vec::with_capacity(1024);
    let BufResult(_, buf) = io.read(buf).await;

    let mut rd = io.clone();
    compio::runtime::spawn(write(io, buf)).detach();

    let buf = Vec::with_capacity(1024);
    let BufResult(_, buf) = rd.read(buf).await;
}

async fn write(mut io: TcpStream, buf: Vec<u8>) {
    println!("Writing {:?}", buf);
    io.write(buf).await.0.unwrap();

    // this line never completes
    io.close().await.unwrap();
}
Berrysoft commented 2 months ago

That's because rd is still reading and not dropped. io is waiting for it.

fafhrd91 commented 2 months ago

so it is not possible to close tcp stream while reading?

Berrysoft commented 2 months ago

Yes. The read operation is submitted to the kernel, and it can only be closed after all related operations are completed.

It is a little helpless, but mainly for safety. The polling driver needs to ensure that the file descriptor is valid when the operation is completed.

fafhrd91 commented 2 months ago

what will happen if I drop io.read() future? is it safe to drop read/write operations?

Berrysoft commented 2 months ago

It is safe to drop the futures, but the operation may not be cancelled immediately, depends on the current state of the driver and the kernel. Finally, the operation may either complete or be cancelled, and the stream could be closed.

fafhrd91 commented 2 months ago

ok. thanks