aembke / redis-protocol.rs

A Rust implementation of RESP2 and RESP3
Apache License 2.0
39 stars 13 forks source link

cannot used in tokio? #36

Open Praying opened 1 week ago

Praying commented 1 week ago

I write code as this, but it got error as the follow, so I want to known how use this crate in tokio ?

use crate::com::CODE_PORT_IN_USE;
use std::net::SocketAddr;
use std::process;
use tokio::net::TcpStream;
use tracing::{error, info, instrument, warn};
use crate::com::{create_reuse_port_listener, ClusterConfig};
use futures::{SinkExt, StreamExt};
use redis_protocol::{
    codec::{resp3_encode_command, Resp3},
    resp3::types::{BytesFrame, Resp3Frame, RespVersion},
};
use tokio_util::codec::{Framed, Decoder, Encoder};

#[instrument]
pub async fn handle_front_conn(socket: TcpStream) {
    if let Err(err) = socket.set_nodelay(true) {
        warn!("cluster fail to set nodelay but skip, due to {:?}",err);
    }
    let mut client_framed = Framed::new(socket, Resp3::default());
    while let Some(result) = client_framed.next().await {
        match result {
            Ok(request) => {
                info!("cluster receive request: {:?}", request)
            },
            Err(e) => {
                eprintln!("Request decode error: {:?}", e);
                break;
            }
        }
    }
    todo!()
}

#[instrument]
pub async fn start(cc: ClusterConfig) -> anyhow::Result<()> {
    let addr = cc.listen_addr.parse::<SocketAddr>().expect("parse socket never fail");
    let listener = match create_reuse_port_listener(&addr).await {
        Ok(v) => v,
        Err(e) => {
            error!("listen {} error: {}", addr, e);
            process::exit(CODE_PORT_IN_USE);
        }
    };
    info!("Listening on {}", addr);

    loop {
        let (socket, addr) = listener.accept().await?;
        info!("Accepted connection from {}", addr);

        tokio::spawn(async move  {
            handle_front_conn(socket).await;
        });
    }
}
error[E0599]: the method `next` exists for struct `Framed<TcpStream, Resp3>`, but its trait bounds were not satisfied
  --> src\proxy2\standalone\mod.rs:21:44
   |
21 |       while let Some(result) = client_framed.next().await {
   |                                              ^^^^ method cannot be called on `Framed<TcpStream, Resp3>` due to unsatisfied trait bounds
   |
  ::: C:\Users\codeg\.cargo\registry\src\rsproxy.cn-0dccff568467c15b\tokio-util-0.6.10\src\codec\framed.rs:16:1
   |
16 | / pin_project! {
17 | |     /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 | |     /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 | |     ///
...  |
30 | |     }
31 | | }
   | |_- doesn't satisfy `_: StreamExt` or `_: Stream`
   |
   = note: the following trait bounds were not satisfied:
           `tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: Stream`
           which is required by `tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: StreamExt`
aembke commented 1 week ago

Hi @Praying. The codec interface should work with Tokio, but from this code it's not obvious what the issue is. I won't be able to reproduce this due to the references to your other crate types though.

Here's an example of the tests that use Tokio: https://github.com/aembke/redis-protocol.rs/blob/main/tests/codec/mod.rs

Praying commented 1 week ago

@aembke It seems works with

tokio-util = { version = "0.7.12", features = ["codec"] }

but not work with

tokio-util = { version = "0.6", features = ["codec"] }