cloudflare / pingora

A library for building fast, reliable and evolvable network services.
Apache License 2.0
20.28k stars 1.1k forks source link

`Timeout` is not `Sync`, hurting interoperability #311

Open palant opened 5 days ago

palant commented 5 days ago

Describe the bug

I was experimenting with integrating Pingora with other software, which required me to produce a Sync closure that would read the request body. This failed because Session.read_request_body indirectly uses pingora_timeout::Timeout which isn’t Sync for no good reason.

Pingora info

Pingora version: rev 38a9d55 Rust version: cargo 1.81.0-nightly (a1f47ec3f 2024-06-15) Operating system version: Fedora 40

Steps to reproduce

Try to compile the following code which should produce a Sync stream instance to read request body:

fn body_stream(session: &mut Session) -> impl futures_core::stream::Stream<Item = Result<bytes::Bytes>> + Sync + '_ {
    async_stream::try_stream! {
        while let Some(bytes) = session.read_request_body().await? {
            yield bytes;
        }
    }
}

For complete code see https://gist.github.com/palant/16c1e8541e14a9894aa76cec7b6ca146

Expected results

This code should compile correctly.

Observed results

Compiling fails:

error[E0277]: `(dyn Future<Output = ()> + Send + 'static)` cannot be shared between threads safely
    --> src/main.rs:13:46
     |
13   |     fn body_stream(session: &mut Session) -> impl Stream<Item = Result<bytes::Bytes>> + Sync {
     |                                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `(dyn Future<Output = ()> + Send + 'static)` cannot be shared between threads safely
     |
     = help: the trait `Sync` is not implemented for `(dyn Future<Output = ()> + Send + 'static)`, which is required by `AsyncStream<Result<bytes::Bytes, Box<pingora::Error>>, {async block@async-stream-0.3.5/src/lib.rs:229:9: 229:63}>: Sync`
     = note: required for `Unique<(dyn Future<Output = ()> + Send + 'static)>` to implement `Sync`
note: required because it appears within the type `Box<(dyn Future<Output = ()> + Send + 'static)>`
    --> toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:236:12
     |
236  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<(dyn Future<Output = ()> + Send + 'static)>>`
    --> toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it appears within the type `Option<Pin<Box<(dyn Future<Output = ()> + Send + 'static)>>>`
    --> toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/option.rs:574:10
     |
574  | pub enum Option<T> {
     |          ^^^^^^
note: required because it appears within the type `pingora::prelude::Timeout<impl Future<Output = Result<Option<BufRef>, Box<pingora::Error>>>, FastTimeout>`
    --> pingora-timeout/src/lib.rs:99:16
     |
99   |     pub struct Timeout<T, F> {
     |                ^^^^^^^
note: required because it's used within this `async` fn body
    --> pingora-core/src/protocols/http/v1/server.rs:363:61
     |
363  |       async fn read_body(&mut self) -> Result<Option<BufRef>> {
     |  _____________________________________________________________^
364  | |         match self.read_timeout {
365  | |             Some(t) => match timeout(t, self.do_read_body()).await {
366  | |                 Ok(res) => res,
...    |
370  | |         }
371  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> pingora-core/src/protocols/http/v1/server.rs:343:70
     |
343  |       pub async fn read_body_bytes(&mut self) -> Result<Option<Bytes>> {
     |  ______________________________________________________________________^
344  | |         let read = self.read_body().await?;
345  | |         Ok(read.map(|b| {
346  | |             let bytes = Bytes::copy_from_slice(self.get_body(&b));
...    |
352  | |         }))
353  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> pingora-core/src/protocols/http/server.rs:103:72
     |
103  |       pub async fn read_request_body(&mut self) -> Result<Option<Bytes>> {
     |  ________________________________________________________________________^
104  | |         match self {
105  | |             Self::H1(s) => s.read_body_bytes().await,
106  | |             Self::H2(s) => s.read_body_bytes().await,
107  | |         }
108  | |     }
     | |_____^
note: required because it's used within this `async` block
    --> src/main.rs:14:9
     |
14   | /         try_stream! {
15   | |             while let Some(bytes) = session.read_request_body().await? {
16   | |                 yield bytes;
17   | |             }
18   | |         }
     | |_________^
note: required because it appears within the type `AsyncStream<Result<Bytes, Box<Error>>, ...>`
    --> async-stream-0.3.5/src/async_stream.rs:12:16
     |
12   |     pub struct AsyncStream<T, U> {
     |                ^^^^^^^^^^^
     = note: the full name for the type has been written to 'target/debug/deps/testing-5a32aca7e1d0069a.long-type-9241063879672993117.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: this error originates in the macro `$crate::__private::try_stream_inner` which comes from the expansion of the macro `try_stream` (in Nightly builds, run with -Z macro-backtrace for more info)

Additional context

As I’ve established, the issue is the BoxFuture type returned by ToTimeout::timeout. It only ensures that the Future and Send traits are implemented but not Sync. With the changes in #312 the code above compiles correctly.