BiagioFesta / wtransport

Async-friendly WebTransport implementation in Rust
Apache License 2.0
346 stars 19 forks source link

looping over RecvStream::read causes an infinite loop without the client sending anything. #176

Open JimitSoni18 opened 1 month ago

JimitSoni18 commented 1 month ago

i have the following server code that loops over received message on RecvStream::read :

use wtransport::Endpoint;
use wtransport::Identity;
use wtransport::ServerConfig;

#[tokio::main]
async fn main() {
    let config = ServerConfig::builder()
        .with_bind_default(4433)
        .with_identity(
            &Identity::load_pemfiles(
                "/home/jimit/certs/full-chain.cert.pem",
                "/home/jimit/certs/private.key.pem",
            )
            .await
            .unwrap(),
        )
        .build();

    let server = Endpoint::server(config).unwrap();

    loop {
        let incoming_session = server.accept().await;
        println!("incoming session");
        match incoming_session.await {
            Ok(incoming_request) => {
                let connection_result = incoming_request.accept().await;
                match connection_result {
                    Ok(connection) => {
                        tokio::spawn(handle_connection(connection));
                    }
                    Err(error) => {}
                }
            }
            Err(error) => {
                continue;
            }
        };
    }
}

async fn handle_connection(connection: wtransport::Connection) {
    match connection.accept_uni().await {
        Ok(mut uni_stream) => {
            let mut bytes = vec![0; 256];
            loop {
                match uni_stream.read(&mut bytes).await {
                    Ok(result) => {
                        if let Some(size) = result {
                            println!("read {size} bytes");
                        }
                        let res_string = String::from_utf8(bytes.clone());
                        if let Ok(message) = res_string {
                            let message = message.trim();
                            println!("{message}");
                        } else {
                            println!("{res_string:?}");
                        }
                    }
                    Err(error) => {
                        eprintln!("error reading bytes: {error}");
                        return;
                    }
                }
                println!("read bytes: {bytes:?}");
            }
        }
        Err(error) => {}
    }
}

and the following client code:

    const wt = new WebTransport("https://127.0.0.1:4433");
    console.log({ wt });
    await wt.ready;
    wt.closed
        .then(() => {
            console.log("The receiveStream closed gracefully.");
        })
        .catch((e) => {
            console.error("The receiveStream closed abruptly.", e);
        });
    const stream = await wt.createUnidirectionalStream();
    const writer = stream.getWriter();
    const buffer1 = new TextEncoder().encode(
        "Hello from the other siiiiiiddeee."
    );
    const buffer2 = new TextEncoder().encode(
        "I must have tried a thousand tiiiiiimeeeess"
    );
    await writer.write(buffer1);
    await writer.write(buffer2);
    console.log("closing stream");
    await stream.close();
    console.log("stream closed");
    console.log("closing transport");
    wt.close(closeWith);
    console.log("transport closed");

    console.log("wrote data");

and here are the logs of both client and server (in screenshot): image image

as you can see, i am only sending data once on the stream, but it still gets stuck in loop for whatever reason. the fault could completely be mine here, as i haven't used webtransport a lot and know nothing about how streams work, so i do appreciate any feedback, thank you!

JimitSoni18 commented 1 month ago

I discovered something new here, in the client code,

    await stream.close();

promise never resolves or rejects for some reason. wt.close() is never closed. so the problem could be, that the stream is closed, but the transport is not, so when RecvStream::read tries to read data from the stream/socket, it gets something like Poll::Pending(None) or something similar.

although i can't tell for sure, because I don't know the implementation details, i just deduced from what i saw.

Edit: after testing a few more times, i noticed that this also happens with stream.close() removed. this does not happen frequently.

JimitSoni18 commented 1 month ago

If you are trying to reproduce this, please know that this does not occur every time. I refreshed the page multiple (8-12) times making connections multiple times and it happened only once.

BiagioFesta commented 1 month ago

Maybe the js stream is locked by the writer? https://developer.mozilla.org/en-US/docs/Web/API/WritableStream/locked

JimitSoni18 commented 1 month ago

@BiagioFesta the js stream is locked because i am writing to the stream, but how does that cause these issues...

BiagioFesta commented 1 month ago

@JimitSoni18 can you please explain better what the issue exactly and what is the behavior that instead you expect?

I don't see any "stuck loop" trying your code. If are you referring that println!("{message}"); is seen multiple times, isn't that just because you are refreshing multiple times the page and you got multiple spawn for different webtransport sessions?

JimitSoni18 commented 1 month ago

I have attached a video that shows how the RecvStream::read returns continuously without the client sending any data. you can also see that this does not happen every time i start a connection. here is my server code:

use std::time::Duration;

use wtransport::{Endpoint, Identity, ServerConfig};

#[tokio::main]
async fn main() {
    let config = ServerConfig::builder()
        .with_bind_default(4433)
        .with_identity(
            &Identity::load_pemfiles(
                "/home/jimit/certs/full-chain.cert.pem",
                "/home/jimit/certs/private.key.pem",
            )
            .await
            .unwrap(),
        )
        .max_idle_timeout(Some(Duration::from_secs(20)))
        .unwrap()
        .build();

    let server = Endpoint::server(config).unwrap();

    loop {
        let incoming_session = server.accept().await;
        println!("incoming session");
        match incoming_session.await {
            Ok(incoming_request) => {
                let connection_result = incoming_request.accept().await;
                println!(
                    "ok request type of connection: {}",
                    if connection_result.is_ok() {
                        "Ok"
                    } else {
                        "Err"
                    }
                );
                match connection_result {
                    Ok(connection) => {
                        tokio::spawn(handle_connection(connection));
                    }
                    Err(error) => {
                        eprintln!("rejected connection for reason: {error}");
                    }
                }
            }
            Err(error) => {
                eprintln!("=>> {error:?}\n");
                continue;
            }
        };
    }
}

async fn handle_connection(connection: wtransport::Connection) {
    match connection.accept_uni().await {
        Ok(mut uni_stream) => {
            let mut bytes = vec![0; 256];
            loop {
                match uni_stream.read(&mut bytes).await {
                    Ok(result) => {
                        if let Some(size) = result {
                            println!("read {size} bytes");
                            let res_string = String::from_utf8(
                                bytes[0..size].to_vec().clone(),
                            );
                            if let Ok(message) = res_string {
                                println!("{message}");
                            } else {
                                println!("{res_string:?}");
                            }
                        } else {
                            println!("read nothing");
                        }
                    }
                    Err(error) => {
                        eprintln!("error reading bytes: {error}");
                        return;
                    }
                }
            }
        }
        Err(error) => {
            eprintln!("uni_stream returned error: {error}");
        }
    }
    println!("returning for whatever reason");
}

Expected behavior:

in the handle_connection function, i looped over uni_stream.read(buf).await and logged the output. if the client sends only one message, uni_stream.read(buf).await should log the message once, and then await for another message to arrive. but instead, what happens is it continuously reads 0 bytes and executes the statement print read nothing that i put. you can also see the client code in the video, which shows that the client sends only these 2 message. Here is the client code:

    const wt = new WebTransport("https://127.0.0.1:4433");
    console.log({ wt });
    await wt.ready;
    const buffer1 = new TextEncoder().encode(
        "Hello from the other siiiiiiddeee."
    );
    const buffer2 = new TextEncoder().encode(
        "I must have tried a thousand tiiiiiimeeeess"
    );
    const stream = await wt.createUnidirectionalStream();
    const writer = stream.getWriter();
    await writer.write(buffer1);
    await writer.write(buffer2);
    await writer.ready;

Please tell me if there is any confusion

JimitSoni18 commented 1 month ago

forgot to attach the video Screencast from 2024-05-05 15-15-18.webm

BiagioFesta commented 1 month ago

Thank you for the attached video, that makes the investigation always easier :)


There are a few notes:

JimitSoni18 commented 1 month ago

@BiagioFesta that makes sense! i'll use tracing for the logging issue, but I also have some more questions:

BiagioFesta commented 1 month ago

you can see in the beginning of the video, some times it does not loop over read nothing when there is no data to read, and sometimes it does, so I can't understand why...

It depends what connection that log belongs to. Please add some context to the logs (like an incremental counter that identifies the connection). When you refresh the page the browser could cache old connection, or, for example, an old connection goes in timeout (after a few seconds), and the stream is closed.

Also, you can see that sometimes, i receive some errors from the client like 268 or 256 connection reset, even though i dont close the transport from client, please help me understand why.

Again, that might be a previous connection. The log might appear after some time because timeout of browser collecting the connection.

Also sometimes, when i do close the connection from client side, using transport.close() awaiting after transportStream.write() has finished, then i see the server log, i find that no data has been received on the server, and the connection closes without printing the message. does that have something to do with how javascript deals with streams?

I don't know. That feels strange to me. More investigation is needed. It might depend on the browser implementation (or a bug, for example: see this issue https://github.com/BiagioFesta/wtransport/issues/145 where it seems like a browser bug)

Last but not the least, if possible, why not have an API that only polls the future to read from the open stream only when there is data to read. that would really help with efficiency as it would not be stuck reading None when there is nothing to read in the open stream. It would be a lot helpful in single threaded runtimes, for example.

I am not sure I am understanding this point here.

The stream is polled when data is available. When it returns None it does not mean that there are no data available. It means the stream is finished. No more data will come. So any further poll will always immediately returns None as it stream is "closed"