johanhelsing / bevy_web_asset

Bevy asset loader that transparently supports loading over http(s)
Apache License 2.0
71 stars 17 forks source link

Bevy 0.12 skeleton #18

Closed voidentente closed 10 months ago

voidentente commented 10 months ago

This is a bare-bones non-functional skeleton for the new API. It doesn't work yet, because the Box::pin future isn't picked up again if it ever yields - would be happy if you could spare a look @johanhelsing.

voidentente commented 10 months ago

Assuming native, the following snippet is currently problematic:

#[cfg(not(target_arch = "wasm"))]
let bytes = surf::get(uri_str)
    .recv_bytes()
    .await // This yields, but the executor does not seem to ever poll this future again
    .map_err(|_| AssetReaderError::NotFound(uri))?;

There is something going on, I just don't know what. However, it's probably some kind of side effect.

This can be further demonstrated:

struct State {
    state: u8,
}

impl Future for State {
    type Output = u8;

    fn poll(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        info!("polled ({})", self.state);
        if self.state >= 8 {
            info!("ready");
            Poll::Ready(self.state)
        } else {
            self.state += 1;
            Poll::Pending
        }
    }
}

async fn get<'a>(uri: PathBuf) -> Result<Box<Reader<'a>>, AssetReaderError> {
    State { state: 0 }.await;
    panic!();
}

...outputs:

polled (0)

If BoxedFuture isn't actually driven to completion, it's not actually async, so all async code would have to happen inside the AsyncRead trait object, but I'm not sure this is intended.

johanhelsing commented 10 months ago

On related note, It kind of looks like surf has gone unmaintained... It would perhaps be good to consider switching. Perhaps look into ehttp or reqwest? https://github.com/emilk/ehttp

Mentioning because it might affect how we want to handle async stuff. Not sure if they all use async, or some use callbacks.

EDIT: Unfortunately, I don't have the time right now to dig into the details of this issue, hence just the brain dump.

I still really appreciate the work.

voidentente commented 10 months ago

I cooked up something with ehttp:

async fn get<'a>(uri: PathBuf) -> Result<Box<Reader<'a>>, AssetReaderError> {
    use ehttp::{fetch, Request};
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::mpsc::{channel, Receiver};
    use std::task::{Context, Poll};

    let uri_str = uri.to_string_lossy();
    let (sender, receiver) = channel();
    fetch(Request::get(uri_str), move |result| {
        use std::io::{Error, ErrorKind};
        sender
            .send(
                result
                    .map(|response| response.bytes)
                    .map_err(|e| AssetReaderError::Io(Error::new(ErrorKind::Other, e))),
            )
            .unwrap();
    });

    struct AsyncReceiver<T>(Receiver<T>);
    impl<T> Future for AsyncReceiver<T> {
        type Output = T;
        fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
            match self.0.try_recv() {
                Err(_) => Poll::Pending,
                Ok(t) => Poll::Ready(t),
            }
        }
    }

    let bytes = AsyncReceiver(receiver).await?;
    let reader: Box<Reader> = Box::new(VecReader::new(bytes));
    Ok(reader)
}

Compiles, but the problem persists.

EDIT: I've opened a thread on the Bevy discord regarding this behavior. Looking forward to what causes it.

voidentente commented 10 months ago

Tried to make it so the BoxedFuture immediately is ready to return the reader, making the reader do the polling. Doesn't work either:

async fn get<'a>(uri: PathBuf) -> Result<Box<Reader<'a>>, AssetReaderError> {
    use ehttp::{fetch, Request};
    use futures_io::AsyncRead;
    use std::io::{Error, ErrorKind, Write};
    use std::pin::Pin;
    use std::sync::mpsc::{channel, Receiver};
    use std::sync::Mutex;
    use std::task::{Context, Poll};

    struct AsyncReader(Mutex<Receiver<Result<Vec<u8>, String>>>);
    impl AsyncRead for AsyncReader {
        fn poll_read(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            mut buf: &mut [u8],
        ) -> Poll<futures_io::Result<usize>> {
            bevy::prelude::info!("poll_read");
            let lock = self.0.lock().unwrap();
            if let Ok(result) = lock.try_recv() {
                bevy::prelude::info!("got sumthin");
                let ready = || -> futures_io::Result<usize> {
                    let bytes = result.map_err(|e| Error::new(ErrorKind::Other, e))?;
                    buf.write_all(&bytes)?;
                    Ok(bytes.len())
                };
                Poll::Ready(ready())
            } else {
                bevy::prelude::info!("got nuthin");
                Poll::Pending
            }
        }
    }

    let uri_str = uri.to_string_lossy();
    let (sender, receiver) = channel();

    fetch(Request::get(uri_str), move |result| {
        sender.send(result.map(|response| response.bytes)).unwrap();
    });

    let reader: Box<Reader> = Box::new(AsyncReader(Mutex::new(receiver)));
    Ok(reader)
}

..returns:

poll_read
got nuthin
mockersf commented 10 months ago

@voidentente

async fn get<'a>(uri: PathBuf) -> Result<Box<Reader<'a>>, AssetReaderError> {
    use ehttp::{fetch, Request};
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::mpsc::{channel, Receiver};
    use std::task::{Context, Poll};

    let uri_str = uri.to_string_lossy();
    let (sender, receiver) = channel();
    fetch(Request::get(uri_str), move |result| {
        use std::io::{Error, ErrorKind};
        sender
            .send(
                result
                    .map_err(|e| AssetReaderError::Io(Error::new(ErrorKind::Other, e)))
                    .and_then(|response| match response.status {
                        200 => Ok(response.bytes),
                        404 => Err(AssetReaderError::NotFound(uri.clone())),
                        _ => Err(AssetReaderError::Io(Error::from(ErrorKind::Other))),
                    }),
            )
            .unwrap();
    });

    struct AsyncReceiver<T>(Receiver<T>);
    impl<T> Future for AsyncReceiver<T> {
        type Output = T;
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            match self.0.try_recv() {
                Err(_) => {
                    cx.waker().wake_by_ref();
                    Poll::Pending
                }
                Ok(t) => Poll::Ready(t),
            }
        }
    }

    let bytes = AsyncReceiver(receiver).await?;
    let reader: Box<Reader> = Box::new(VecReader::new(bytes));
    Ok(reader)
}

works for me with that. two changes:

voidentente commented 10 months ago

Neat!

Although ehttp seems to have a problem with https://s3.johanhelsing.studio/dump/favicon.png and never runs the callback for me. https://raw.githubusercontent.com/johanhelsing/bevy_web_asset/main/Cargo.toml works though. Interestingly, https://raw.githubusercontent.com/johanhelsing/bevy_web_asset/main/assets/favicon.png works.

However, ehttp dying also made Bevy hang, as if we just polled the receiver in a loop, which is not optimal. I'll roll with the working asset for now.

johanhelsing commented 10 months ago

Although ehttp seems to have a problem with https://s3.johanhelsing.studio/dump/favicon.png and never runs the callback for me.

Is this a problem with ehttp specifically?

voidentente commented 10 months ago

Is this a problem with ehttp specifically?

Just checked, reqwest does not have this issue.

voidentente commented 10 months ago

Noteworthy: It might be interesting to take inspiration from https://github.com/lizelive/bevy_http and support user-defined remotes to help reduce writing the same base URL over and over again.

johanhelsing commented 10 months ago

Hmm... what's the difference between this crate and bevy_http? Seems to solve the same issue?

voidentente commented 10 months ago

State of affairs:

...I'll just go back to surf.

johanhelsing commented 10 months ago

I though ureq had a blocking API? Wouldn't that make it unusable for us? Wasn't aware that ehttp was using it.

johanhelsing commented 10 months ago
* `reqwest` supports wasm, but requires a Tokio reactor on native.

fwiw, I've had some success running tokio stuff in bevy through async-compat, but it feels less than ideal. We do this in bevy_matchbox with the signaling feature, and matchbox_socket, because webrtc-rs assumes tokio.

It's a bit unfortunate that there seems to be no popular maintained async http crate that doesn't require tokio?

voidentente commented 10 months ago

I'm trying to think of a way to do this non-blockingly, but if we have to tell the executor when to poll again, we can either be polled always or never.

I'd prefer a solution where we wait for the bytes in a blocking thread and then wake the task, but I don't think that's available in wasm.

voidentente commented 10 months ago

Hm, there's something cursed about your S3 server, Johan, because I found another crate (minreq) that just hangs when I try to get https://s3.johanhelsing.studio/dump/favicon.png.

voidentente commented 10 months ago

For native, I currently have something like this in mind:

struct WakeOnComplete {
    request: Option<minreq::Request>,
    join: Option<thread::JoinHandle<Result<minreq::Response, minreq::Error>>>,
}

impl WakeOnComplete {
    fn new(request: minreq::Request) -> Self {
        Self {
            request: Some(request),
            join: None,
        }
    }
}

impl Future for WakeOnComplete {
    type Output = Result<minreq::Response, minreq::Error>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        info!("called poll");

        let request = self.request.take();
        let join = self.join.get_or_insert_with(|| {
            info!("spawning waker thread");
            let waker = cx.waker().to_owned();
            thread::spawn(|| {
                let result = request.unwrap().send();
                info!("response got, waking");
                waker.wake();
                result
            })
        });

        if join.is_finished() {
            info!("thread finished");
            let join = self.join.take().unwrap();
            Poll::Ready(join.join().unwrap())
        } else {
            info!("thread pending");
            Poll::Pending
        }
    }
}

impl Drop for WakeOnComplete {
    fn drop(&mut self) {
        info!("dropped WakeOnComplete");
    }
}

async fn get<'a>(uri: PathBuf) -> Result<Box<Reader<'a>>, AssetReaderError> {
    use std::io::{Error, ErrorKind};

    WakeOnComplete::new(minreq::get(uri.to_string_lossy()))
        .await
        .map_err(|e| AssetReaderError::Io(Error::new(ErrorKind::Other, e)))
        .and_then(|response| match response.status_code {
            200 => Ok(Box::new(VecReader::new(response.into_bytes())) as _),
            404 => Err(AssetReaderError::NotFound(uri)),
            code => Err(AssetReaderError::Io(Error::new(
                ErrorKind::Other,
                format!("unexpected status code {code}"),
            ))),
        })
}
2023-11-18T14:29:32.581962Z  INFO bevy_web_asset::web_asset_source: called poll
2023-11-18T14:29:32.581986Z  INFO bevy_web_asset::web_asset_source: spawning waker thread
2023-11-18T14:29:32.582068Z  INFO bevy_web_asset::web_asset_source: thread pending
2023-11-18T14:29:32.594318Z  INFO bevy_web_asset::web_asset_source: dropped WakeOnComplete
2023-11-18T14:29:32.898515Z  INFO bevy_web_asset::web_asset_source: response got, waking

But it's not working... even after waking, the future isn't polled again, presumably because it's dropped.

It seems that waking the task from another thread isn't possible. However, that's in direct contradiction with the documentation of Waker:

Implements [`Clone`], [`Send`], and [`Sync`]; therefore, a waker may be invoked
from any thread, including ones not in any way managed by the executor. For example,
this might be done to wake a future when a blocking function call completes on another
thread.
voidentente commented 10 months ago

I think we should commit this as 0.7.0 for bare-bones 0.12 support for now. I ended up using Isahc for native, which does not have the weird connectivity issue and provides built-in AsyncRead which we can simply pass to Bevy as is.

The issue of blocking remains, but I'm not an async magician, so I probably won't figure out why the future is dropped by async-task before it completes/why waking the task from another thread doesn't work. At least this is mitigated when multi-threaded task pools are used.

I have confirmed that this commit is functional on native/single-threaded and on wasm.

EDIT: I would like to aim for a Wake-on-Complete approach for 0.7.1, but we might have to involve someone from the engine team with more insight on how the async task pool works.

johanhelsing commented 10 months ago

Sounds like a good plan. Fwiw isahc also seems to be unmaintained? Id rather have unmaintained deps than regressions, though.

voidentente commented 10 months ago

Sounds like a good plan. Fwiw isahc also seems to be unmaintained? Id rather have unmaintained deps than regressions, though.

Switching out backends should be a nothingburger. If we find a better alternative (which hopefully comes hand-in-hand with the non-blocking version), we can just switch out.

johanhelsing commented 10 months ago

Alright, LGTM. Single-threaded native is hopefully not that common anyway.

voidentente commented 9 months ago

Follow-up: Bevy natively loads assets via HTTP in WASM (https://github.com/bevyengine/bevy/blob/main/crates/bevy_asset/src/io/wasm.rs), so this code path is redundant. bevy_web_asset only needs to provide an implementation for native.

oli-obk commented 9 months ago

Follow-up: Bevy natively loads assets via HTTP in WASM (https://github.com/bevyengine/bevy/blob/main/crates/bevy_asset/src/io/wasm.rs), so this code path is redundant. bevy_web_asset only needs to provide an implementation for native.

that's only for assets from the same server. If one is using bevy_web_asset, then likely one wants to load assets from another server, which would then still need support on wasm

voidentente commented 9 months ago

Then it might actually make sense to integrate bevy_web_asset into the engine to avoid this duplicate, but not before a non-blocking implementation rolls out. We should gauge whether Carter wants that in the first place, though.

Related: https://github.com/bevyengine/bevy/discussions/10568