snapview / tokio-tungstenite

Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
MIT License
1.88k stars 236 forks source link

SplitStream<S> doesn't satisfy TryStreamExt #273

Closed badger-io closed 1 year ago

badger-io commented 1 year ago

Hey, I'm modifying this repo https://github.com/Darksonn/telnet-chat to use websockets (just for fun) and I've run into an issue with types/traits.

I've modified the client_loop function to call accept_async to start the handshake. I then split the stream and pass the parts to two separate functions:

async fn client_loop(mut data: ClientData) -> Result<(), io::Error> {
    // Do the websocket handshake on top of the raw TCPStream
    let ws_stream = tokio_tungstenite::accept_async(data.tcp)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established");

    // let (read, write) = data.tcp.split();
    let (mut write, read) = ws_stream.split();

    // communication between tcp_read and tcp_write
    let (send, recv) = unbounded_channel();

    // If either of these throws an error then it exists immediately
    let ((), ()) = try_join! {
        tcp_read(data.id, read, data.handle, send),
        tcp_write(write, data.recv, recv),
    }?;

    // TODO: maybe need to close the ws here?
    // TODO: send error message?
    // let _ = data.tcp.shutdown().await;

    Ok(())
}

So far so good. The problem comes when trying to consume the stream in the read function. I've tried different methods from the example code but always get a trait error related to try.

Using a while loop with ?

async fn tcp_read<S: StreamExt>(
    id: ClientId,
    mut read: SplitStream<S>,
    mut handle: ServerHandle,
    to_tcp_write: UnboundedSender<InternalMsg>,
) -> Result<(), io::Error> {
    while let Some(msg_item) = read.next().await {
        match msg_item? {
            Some(msg) => {
                match msg {
                    Ok(Message::Ping(msg)) => println!("Received ping message; msg={:#?}", msg),
                    Ok(Message::Pong(msg)) => println!("Received pong message; msg={:#?}", msg),
                    Ok(Message::Binary(bins)) => println!("Received Binbary message, content={}", std::str::from_utf8(&bins).unwrap_or("unknown")),
                    Ok(Message::Frame(frame)) => println!("Received Frame message, content={:?}", frame),
                }
            },
            None => (),
        }
    }

    // disconnected

    Ok(())
}

Gives error

error[E0277]: the `?` operator can only be applied to values that implement `Try`
   --> src/client.rs:176:15
    |
176 |         match msg_item? {
    |               ^^^^^^^^^ the `?` operator cannot be applied to type `<S as Stream>::Item`
    |
    = help: the trait `Try` is not implemented for `<S as Stream>::Item`
help: consider further restricting the associated type
    |
174 | ) -> Result<(), io::Error> where <S as Stream>::Item: Try {
    |                            ++++++++++++++++++++++++++++++

Using try_filter

async fn tcp_read<S: StreamExt>(
    id: ClientId,
    mut read: SplitStream<S>,
    mut handle: ServerHandle,
    to_tcp_write: UnboundedSender<InternalMsg>,
) -> Result<(), io::Error> {

    read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
        .forward(to_tcp_write)
        .await
        .expect("Failed to forward messages");

    // disconnected

    Ok(())
}

I get the following error

error[E0599]: the method `try_filter` exists for struct `SplitStream<S>`, but its trait bounds were not satisfied
   --> src/client.rs:223:10
    |
223 |     read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
    |          ^^^^^^^^^^ method cannot be called on `SplitStream<S>` due to unsatisfied trait bounds
    |
   ::: /Users/matt/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.28/src/stream/stream/split.rs:14:1
    |
14  | pub struct SplitStream<S>(BiLock<S>);
    | -------------------------
    | |
    | doesn't satisfy `SplitStream<S>: TryStreamExt`
    | doesn't satisfy `SplitStream<S>: TryStream`
    |
    = note: the following trait bounds were not satisfied:
            `SplitStream<S>: TryStream`
            which is required by `SplitStream<S>: TryStreamExt`
            `&SplitStream<S>: TryStream`
            which is required by `&SplitStream<S>: TryStreamExt`
            `&mut SplitStream<S>: TryStream`
            which is required by `&mut SplitStream<S>: TryStreamExt`

What am I missing? I get the same errors if I try to consume the stream directly in the client_loop function.

For reference: Cargo.toml

[package]
name = "websocket_poc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.27.0", features = ["full"] }
tokio-util = { version = "0.7.7", features = ["codec"] }
tokio-tungstenite = "0.18.0"
futures = "0.3.28"
bytes = "1.4.0"
daniel-abramov commented 1 year ago

That's correct. Streams are similar to iterators. Iterator trait in Rust returns Option<Self::Item>, not the Result<Self::Item>. Same with the Stream. The type when reading from the stream would be Option<Result<...>>, not the Result<Option<..>>!

Your tcp_read function is written in a generic way and does not say anything about the type of the stream except that it implements the StreamExt. So you either need to pass the actual type to the function, or add additional requirements to the function by stating the expectations of the stream to implement the traits that you intend to use, i.e. in your case you need to add a requirement to the function definition that you want the stream to implement TryStreamExt.