salvo-rs / salvo

A powerful web framework built with a simplified design.
https://salvo.rs
Apache License 2.0
3.38k stars 205 forks source link

A method to bypass the issue of having pointers in a handling function that cannot be Send. #299

Closed soloist-v closed 1 year ago

soloist-v commented 1 year ago
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use http_body_util::Full;
use hyper::{Request, Response};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use tokio::net::TcpListener;

#[repr(transparent)]
struct AsyncFuture<F>(F);

unsafe impl<F: Future> Send for AsyncFuture<F> {}

impl<F: Future> Future for AsyncFuture<F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe {
            Pin::new_unchecked(&mut self.get_unchecked_mut().0).poll(cx)
        }
    }
}

fn hello(_: Request<hyper::body::Incoming>) -> AsyncFuture<Pin<Box<dyn Future<Output=Result<Response<Full<Bytes>>, Infallible>>>>> {
    let f = async move {
        let a = 10;
        let p = &a as *const i32;
        tokio::task::yield_now().await;
        println!("{:p}", p);
        Ok::<Response<Full<Bytes>>, Infallible>(Response::new(Full::new(Bytes::from("Hello World!"))))
    };
    AsyncFuture(Box::pin(f))
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    pretty_env_logger::init();
    let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
    let listener = TcpListener::bind(addr).await?;
    println!("Listening on http://{}", addr);
    loop {
        let (stream, _) = listener.accept().await?;
        tokio::task::spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(stream, service_fn(hello))
                .await
            {
                println!("Error serving connection: {:?}", err);
            }
        });
    }
}
chrislearn commented 1 year ago

image I have tested and it works. Make sure your are using latest stable rust.

soloist-v commented 1 year ago
use salvo::prelude::*;

#[handler]
async fn hello(res: &mut Response) {
    let a = 10;
    let p = &a as *const i32;
    tokio::task::yield_now().await;
    res.render(Text::Plain("Hello World"));
    println!("{:p}", p);
}

#[tokio::main]
async fn main() {
    let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
    let router = route();
    println!("{:?}", router);
    Server::new(acceptor).serve(router).await;
}

fn route() -> Router {
    Router::new()
        .push(Router::with_path("a").get(hello))
}
error: future cannot be sent between threads safely
  --> src\main.rs:3:1
   |
3  | #[handler]
   | ^^^^^^^^^^ future created by async block is not `Send`
   |
   = help: within `[async block@src\main.rs:3:1: 3:11]`, the trait `Send` is not implemented for `*const i32`
note: future is not `Send` as this value is used across an await
  --> src\main.rs:7:30
   |
6  |     let p = &a as *const i32;
   |         - has type `*const i32` which is not `Send`
7  |     tokio::task::yield_now().await;
   |                              ^^^^^ await occurs here, with `p` maybe used later
...
10 | }
   | - `p` is later dropped here
   = note: required for the cast from `Pin<Box<[async block@src\main.rs:3:1: 3:11]>>` to `Pin<Box<(dyn Future<Output = ()> + Send + 'async_trait)>>`
   = note: this error originates in the attribute macro `handler` (in Nightly builds, run with -Z macro-backtrace for more info)

Compiling Salvo will produce the error output mentioned above, but Actix-web will compile successfully. I provided a workaround at the beginning that can be referenced.

chrislearn commented 1 year ago

Where is your actix-web code? can you past it here?

chrislearn commented 1 year ago

  let p = &a as *const i32; error said clearly your code cause future can not send.

soloist-v commented 1 year ago

I found that web frameworks like Hyper may have the same issue. This problem makes it difficult to use opencv::core::Mat in handlers. Mat is internally a pointer and implements Send, but it does not implement Sync. This means that &Mat cannot be sent. If you call an asynchronous method in a handler that requires &Mat, it will fail to compile. The compiler will tell you that *const void cannot be sent between threads.

If you implement a wrapper for Mat and make it Send, there are some small issues when using it. For example:

let mat = Mat::default();
let mat_ref: &MatWrapper = to_sync(&mat);
model.predict(mat_ref);

This case is correct. However, if you directly write model.predict(to_sync(&mat)), it will cause the compilation error mentioned earlier. The reason is that model.predict(to_sync(&mat)) captures a reference to mat. Surprisingly, when I use model.predict(to_sync(&mat)) in Actix-web's handler, it compiles successfully. I wanted to find the reason, so I started testing with pointers.

Through my tests, I discovered that Actix-web fully supports this syntax. In other words, Actix-web does not require handlers to implement Send. However, the same test did not pass in Hyper and Salvo. It seems that the reason is simple: tokio::spawn requires Send. I believe supporting this feature is important as it simplifies code writing.

chrislearn commented 1 year ago

But the question is, how to remove the Send restriction according to Rust's safety mechanism? If you do need it, you can use Mutex or other similar methods to make your data thread-safe and allow it to be sent between threads directly.

soloist-v commented 1 year ago

The relationship between Send and Mutex: unsafe impl<T: ?Sized + Send> Send for Mutex<T> {} The reason why Mat cannot be Sync is that when Mat is created through ROI, the source Mat and the new Mat can both modify or read the intersecting memory.

I have come up with a safe way that does not violate Rust's safety principles(Maybe actix-web uses a similar approach: spawn_local):

async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    let a = 10;
    let p = &a as *const i32;
    tokio::task::yield_now().await;
    println!("{:p}", p);
    Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    pretty_env_logger::init();
    let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
    let listener = TcpListener::bind(addr).await?;
    println!("Listening on http://{}", addr);
    loop {
        let (stream, _) = listener.accept().await?;
        tokio::spawn(async move {
            tokio::task::spawn_local(async move {
                if let Err(err) = http1::Builder::new()
                    .serve_connection(stream, service_fn(hello))
                    .await
                {
                    println!("Error serving connection: {:?}", err);
                }
            });
        });
    }
}
chrislearn commented 1 year ago

This cannot be considered a problem, as the requirement for the Handler itself is to be Send, and your code does not meet this requirement, causing a compilation error, which is reasonable and understandable.

soloist-v commented 1 year ago

I posted this question on Reddit and received some responses. https://www.reddit.com/r/rust/comments/14cbe1u/comment/jojxveg/?context=3

soloist-v commented 1 year ago

https://www.reddit.com/r/rust/comments/12hdeh3/axum_vs_actixweb_performance/ 我在这个帖子中的到了一些有用的信息,我想你可以将这些有用的东西应用到salvo中,也许这可以提高salvo的性能,同时解决Send的问题,以下是简单的代码模型:


fn main() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let listener = rt.block_on(async {
        let l = tokio::net::TcpListener::bind("localhost:8080")
            .await
            .unwrap();
        Arc::new(l)
    });
    let cpu_num = 4;
    for _ in 0..cpu_num {
        let listener = listener.clone();
        std::thread::spawn(move || {
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async move {
                    while let Ok((stream, _)) = listener.accept().await {
                        let stream = tokio::net::TcpStream::from_std(stream.into_std().unwrap()).unwrap();
                        tokio::task::spawn_local(async move {
                            if let Err(err) = http1::Builder::new()
                                .serve_connection(stream, service_fn(hello))
                                .await
                            {
                                println!("Error serving connection: {:?}", err);
                            }
                        });
                    }
                })
        });
    };
}
chrislearn commented 1 year ago

不是很明白你这代码的重点, 是说手工建多个线程, 然后每个线程里面运行 tokio, 避免 tokio 多个线程间的任务窃取, 然后, 可以去掉 Send?.

soloist-v commented 1 year ago

spawn_local 接收的 Future 不要求 Send,具体可以看这个函数的介绍,还有就是那个 stream 的转换,有可能是为了运行时到当前运行时,否则这个转换没有意义。

chrislearn commented 1 year ago

感觉这种一般可能性能测试时用得多. 实际项目中可能并不能真的提升性能. 然后 salvo 的代码里面甚至对 future_not_send 是开启警告的 #![warn(clippy::future_not_send)].

soloist-v commented 1 year ago

如果你的代码中使用了不能 Sync 的东西那就很不方便,例如 opencv Mat

chrislearn commented 1 year ago

我想想 是提供一个参数或者feature来处理一下这个比较特殊的情况.

chrislearn commented 1 year ago

似乎不能直接套一层 tokio::task::spawn_local, 这样, 代码里面访问 &mut Request 等引用会报错.

而直接修改 Handler 的签名似乎不可取, 毕竟为了降低难度, 都使用了 async_trait. 而这个会自动添加 Send. 虽然支持添加!Send, 去掉这个约束, 不过我试了一下, 这个会导致代码更多的错误. 很难解决.

soloist-v commented 1 year ago

我能看到这块的代码吗

chrislearn commented 1 year ago

代码全在在库里

soloist-v commented 1 year ago

具体是那个文件,/salvo/crates/core/src/server.rs? tokio::task::spawn_local应该可以顺利替换tokio::task::spawn

chrislearn commented 1 year ago

可能是,反正用这个的地方不多,你搜索替换吧。但是大概率不行。另外,即便行,这个方案也可能不可取。我比较能接受的是有一个办法让你不能Send的通过某个手段变得能Send.

soloist-v commented 1 year ago

“有一个办法让你不能Send的通过某个手段变得能Send.” 这会引入unsafe和未知未定义行为

chrislearn commented 1 year ago

“有一个办法让你不能Send的通过某个手段变得能Send.” 这会引入unsafe和未知未定义行为

这肯定是不行的。所以才说这个问题不好解决。现在大多数库其实也是有类似的限制。不能优雅地解决就暂时不解决了。

soloist-v commented 1 year ago

https://www.reddit.com/r/rust/comments/12hdeh3/axum_vs_actixweb_performance/ 我在这个帖子中的到了一些有用的信息,我想你可以将这些有用的东西应用到salvo中,也许这可以提高salvo的性能,同时解决Send的问题,以下是简单的代码模型:

fn main() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let listener = rt.block_on(async {
        let l = tokio::net::TcpListener::bind("localhost:8080")
            .await
            .unwrap();
        Arc::new(l)
    });
    let cpu_num = 4;
    for _ in 0..cpu_num {
        let listener = listener.clone();
        std::thread::spawn(move || {
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(async move {
                    while let Ok((stream, _)) = listener.accept().await {
                        let stream = tokio::net::TcpStream::from_std(stream.into_std().unwrap()).unwrap();
                        tokio::task::spawn_local(async move {
                            if let Err(err) = http1::Builder::new()
                                .serve_connection(stream, service_fn(hello))
                                .await
                            {
                                println!("Error serving connection: {:?}", err);
                            }
                        });
                    }
                })
        });
    };
}

我很想找机会测试以下这种方式是否有提升

zy010101 commented 1 year ago

如果是在单线程中实现异步,那确实不需要使用Send,因为在同一个线程内。actix-web确实是采用的这种方式。

如果这样能解决的话,salvo可以设计一个异步方式选择,让你选择使用多线程异步或者是使用单线程异步。

zy010101 commented 1 year ago

如果你的代码中使用了不能 Sync 的东西那就很不方便,例如 opencv Mat

如果是这种情况,建议使用OpenCV的地方,做成grpc服务,然后rust通过grpc调用OpenCV实现的服务。

soloist-v commented 1 year ago

我觉得并不需要grpc,grpc会有较大的性能损失,Mat通常是大内存,通信代价很大,而且没有意义,Mat的不安全只会发生在使用roi构造方法的时候,因此Mat没有Sync,导致它的引用不能Send,这很容易解决。我关心的只是是否可以像actix-web那样,如果性能并没有损失,那么会是一种更好的方式,完全遵守安全。