emilk / ehttp

Minimal Rust HTTP client for both native and WASM
Apache License 2.0
316 stars 30 forks source link

Blocking untill the end of the response? #35

Closed MinaMatta98 closed 12 months ago

MinaMatta98 commented 1 year ago

Hello,

I am not sure if this is a bug, or a mistake from my end.

I am trying the streaming api in the following:

    pub fn retrieve_response_async(
        param: ReportDeps,
        content: Arc<Mutex<String>>,
        toasts: Arc<Mutex<Toasts>>,
        boolean_flag: Arc<AtomicBool>,
    ) {
        let flag = boolean_flag.clone();
        let url = "http://127.0.0.1:8000/reportstream";
        let request = ehttp::Request::post(url, serde_json::to_vec(&param).unwrap());

        ehttp::streaming::fetch(
            request,
            move |result: ehttp::Result<ehttp::streaming::Part>| {
                let part = match result {
                    Ok(part) => part,
                    Err(err) => {
                        boolean_flag
                            .clone()
                            .store(false, std::sync::atomic::Ordering::Relaxed);
                        toasts
                            .lock()
                            .error(format!("An error occurred while streaming `{url}`: {err}"));
                        return std::ops::ControlFlow::Break(());
                    }
                };

                match part {
                    ehttp::streaming::Part::Response(response) => {
                        tracing::info!("RESPONSE");
                        println!("Status code: {:?}", response.status);
                        if response.ok {
                            std::ops::ControlFlow::Continue(())
                        } else {
                            boolean_flag.store(false, std::sync::atomic::Ordering::Relaxed);
                            toasts.lock().error(format!(
                                "An error occured, please try again: {}",
                                response.status_text
                            ));
                            std::ops::ControlFlow::Break(())
                        }
                    }
                    ehttp::streaming::Part::Chunk(chunk) => {
                        if chunk.is_empty() {
                            flag.clone()
                                .store(false, std::sync::atomic::Ordering::Relaxed);
                            std::ops::ControlFlow::Break(())
                        } else {
                            let chunk = String::from_utf8(chunk).unwrap();
                            tracing::info!("{:?}", chunk);
                            content.lock().push_str(&chunk);
                            std::ops::ControlFlow::Continue(())
                        }
                    }
                }
            },
        );
    }

As of current, this is only resolving after the request has completed. Note that I'm using a streaming response from actix:


#[post("/reportstream")]
pub async fn report(payload: web::Payload) -> impl Responder {
    let ReportDeps {
        doctor_input,
        age,
        name,
        sex,
        date,
    } = type_cast::<ReportDeps>(payload).await;

    let (mut tx, rx) = futures::channel::mpsc::unbounded::<
        std::result::Result<actix_web::web::Bytes, std::io::Error>,
    >();

    let mut result = client.chat().create_stream(request).await.unwrap();

    tokio::task::spawn(async move {
        while let Some(message) = result.next().await {
            let message = match message {
                Ok(message) => {
                    if let Some(message) = message.choices[0].delta.content.clone() {
                        message.clone()
                    } else {
                        String::new()
                    }
                }
                Err(e) => e.to_string(),
            };
            let _ = tx.start_send(Ok(Bytes::from(message)));
        }
    });
   /// This works and results in a streamed response
    HttpResponse::Ok().streaming(rx)
}

image

As you see, the response is coming in, but its chunks are not being handled as they are coming in, instead, the streaming api is waiting until the end of the response.

Note that curl --no-buffer "127.0.0.1:8000/reportstream" gives a chunked response.

emilk commented 1 year ago

I find this hard to follow. You log the response with tracing::info!("RESPONSE");, and each chunk with tracing::info!("{:?}", chunk);, yet none of these show up in the log you pasted. Maybe silence the spam form h2?

@jprochazk wrote the chunk code, maybe he has some ideas

jprochazk commented 1 year ago

I'm also having a hard time following this. The streaming::fetch API will not stream the request body to the server[^1]. I think this needs a minimal reproduction, because it's unclear what you're trying to accomplish.

[^1]: This is not completely accurate, on native it will if you set the Transfer-Encoding header to chunked, because that's a behavior of ureq. We currently do not have that implemented on the web side.

MinaMatta98 commented 1 year ago

Ok,

I'll try to rephrase the original post.

I have a client-server combo.

The client is written in egui and compiles to wasm via trunk.

The server endpoint I'm hitting results in a streaming response using actix (https://docs.rs/actix-web/latest/actix_web/struct.HttpResponseBuilder.html#method.streaming).

Normally, when I use an alternative client such as curl, I can get the response chunk by chunk.

When I use ehttp::streaming via either Chrome or Firefox, the request does not print chunk by chunk, but instead waits for the completion of the request and handles it as one big chunk.

jprochazk commented 1 year ago

This still needs a minimal reproduction. Without any code to run, there's little we can do to help.

In Wasm, ehttp::streaming::fetch calls the same functions in the same order as the following JS code (via wasm-streams):

function fetch_streaming(request, on_part) {
    fetch(request)
        .then(async (response) => {
            on_part({ response: new Response(null, response) });
            const reader = new ReadableStreamDefaultReader(response.body);
            while (result = await reader.read(), !result.done) {
                on_part({ chunk: result.value });
            }
            on_part({ chunk: new Uint8Array() });
        })
}

const URL = "https://example.org";
fetch_streaming(
    URL,
    (part) => {
        const { response, chunk } = part;
        if (response) {
            console.log(`status: ${response.status}`);
        } else {
            if (chunk.length === 0) return console.log("done");
            console.log("chunk", chunk);
        }
    }
);

You can test your server against this snippet to see if the streaming response also works on the web.

MinaMatta98 commented 1 year ago

Hello,

As requested, I have prepared a mvp via eframe on https://github.com/MinaMatta98/Demo-Debug.

Note: You must change the following line in main:

            std::env::set_var("OPENAI_API_KEY", "");

to

            std::env::set_var("OPENAI_API_KEY", "The key I will send you.");

This is a temporary key that I will delete in 2 days. It can be found in this link https://send.bitwarden.com/#uIZSADqdDECkLLCGAEvpnQ/dvTgfAjTxlaPPod-zM4YOg.

Mind you, the only files you may be interested in are main.rs, page.rs and route.rs (server routing). The remainder are for wasm.

The expected behaviour is that as each chunk comes in, it will append to the string, but this is deferred until the finalization of the call.

MinaMatta98 commented 1 year ago

If you wanted to see what the expected output would be, look at this link:

Video_2023-09-23_14-10-40

Note that was via the exact same server endpoint being used in the mvp product above.

jprochazk commented 1 year ago

This isn't an issue with ehttp. You need to:

  1. Not use any compression in the reportstream route
  2. Call ctx.request_repaint() in the ehttp::streaming::fetch callback

https://github.com/MinaMatta98/Demo-Debug/pull/1

https://github.com/emilk/ehttp/assets/1665677/60246978-1d91-43a1-a1f0-16ea0a439c24

MinaMatta98 commented 1 year ago

Hello,

Thank you very much.

I apologize, as I was under the impression that this was an ehttp problem. Thank you for your assistance.

Could you please provide some information as to why compression should be disabled?