cloudflare / workers-rs

Write Cloudflare Workers in 100% Rust via WebAssembly
Apache License 2.0
2.53k stars 268 forks source link

[Feature] Support incremental backoffs in cloudflare workers #604

Open Starttoaster opened 2 months ago

Starttoaster commented 2 months ago

Is there an existing issue for this?

Description

Since most of tokio isn't supported (since it does not compile to wasm) I'm not certain if there's a real pattern to support a Cloudflare worker, written in Rust using the workers-rs package, to forward a request to another server using reqwest, with an incremental backoff in the case of a 429 rate limit. It would be the goal of this feature to support in the worker runtime some method of sleeping the worker thread for some incremented period of time until either the request finally succeeds, or Cloudflare kills the thread for taking too long as I believe is already expected behavior with workers.

kflansburg commented 2 months ago

We do support reqwest, async, I think this would be pretty trivial to implement with a loop:

let mut wait = 1000; // milliseconds
let res = loop {
    let res = reqwest::get(url).await?;
    if res.status() != 429 {
        break res;
    }
    sleep(1000).await
    wait *= 2;
};

The only tricky thing is sleeping, but I think you can do something like this:

pub async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, reject: js_sys::Function| {
        web_sys::window()
            .unwrap()
            .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap();
}
Starttoaster commented 2 months ago

Thanks! This does get me a bit closer to something that actually works, by way of this actually compiling and running on npx run dev, but I'm getting an issue that I'm maybe not experienced enough with js_sys to understand. I'm running this locally, and trying to run a simple bash script that just slams my local worker with a curl request, one after another (not concurrently.)

[wrangler:inf] POST /test 500 Internal Server Error (99ms)
✘ [ERROR] Uncaught (in response) Error: The script will never generate a response.

✘ [ERROR] A hanging Promise was canceled. This happens when the worker runtime is waiting for a Promise from JavaScript to resolve, but has detected that the Promise cannot possibly ever resolve because all code and events related to the Promise's I/O context have already finished.

And I can tell that not all of the upstream requests actually completed successfully. For context this is basically forwarding a webhook's POST body data to a specific upstream webhook URL for a chat service. I believe I'm getting this every time the worker is receiving a 429 from the chat service.

If this makes sense to you, some help would be appreciated. But I'm still looking in case I can figure it out myself..

kflansburg commented 2 months ago

This is reported by the Workers runtime when all futures have executed but not response has returned, so I suspect there is something slightly wrong with my hacky sleep method.

It is also seen when Rust just panics, so we may be hitting those unwraps. Can you add the panic hook and some logging to see if that is happening?

Starttoaster commented 2 months ago

Actually just figured this out after some edits to the sleep function:

async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
        let _ = web_sys::window()
            .unwrap()
            .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap_or_else(|e| {
        eprintln!("An error occurred awaiting JS future: {:?}", e);
        Default::default()
    });
}

The issue was just the unwrap function, which panicked sometimes like you mentioned

Starttoaster commented 2 months ago

To what you just said: yeah, this does feel a little hacky. It would be excellent for thread sleeping to be smoothed out in workers-rs someday, but this seems to work for now. Thanks again!

Feel free to close the Issue if this doesn't really hold value anymore in your opinion. Like if it's unlikely for workers-rs to do anything in the near or long term to make this a bit more paved in the future. Or if it's already being tracked as a work item somewhere else.

kflansburg commented 2 months ago

To what you just said: yeah, this does feel a little hacky. It would be excellent for thread sleeping to be smoothed out in workers-rs someday, but this seems to work for now. Thanks again!

I think we would end up doing something similar (albeit better tested). It's not very elegant because JS "sleeping" is quirky and involves a setTimeout with callback, and we are just leveraging JS here. But I think this is more or less the right way to implement this.

I think there may be some crates that solve this problem as well. Unfortunately tokio::time requires WASI with time functions implemented (we do support this on Workers, but not workers-rs).

Starttoaster commented 2 months ago

Hm, I wasn't actually testing this properly. To produce a 429 from my chat app, I had to alter my script to actually run some of the requests concurrently. So I'm essentially running this bash script:

#!/bin/bash

counter=0
max_jobs=10

wait_for_jobs() {
    while [ $(jobs -r | wc -l) -ge $max_jobs ]; do
        sleep 1
    done
}

while true; do
  wait_for_jobs

  curl -X POST localhost:8787/test -d '{post body data here}' &

  counter=$((counter + 1))
done

This limits the number of concurrent requests to 10.

With this, I'm getting some successful POST requests, but most (the ones that are getting 429'd), are causing the worker to http 500. So the hacky function seems to have stopped panicking, but still doesn't actually sleep.

When I invoke my worker, it responds to my http client with a helpful message. In this case received unexpected status code and ran out of retries and it was slamming my shell with these messages, likely meaning no sleeping was actually occurring..

This is what the function looks like that actually calls the sleep function from above:

    pub async fn send(&self, url: &str) -> Result<()> {
        let mut retries = 10;
        let mut backoff_delay = 1;

        loop {
            let result = CLIENT
                .post(url)
                .header(
                    reqwest::header::CONTENT_TYPE,
                    "application/json;charset=UTF-8",
                )
                .body(self.to_string())
                .send()
                .await;

            match result {
                Ok(resp) if resp.status().is_success() => {
                    return Ok(());
                }
                Ok(_resp) => {
                    if retries > 0 {
                        sleep(backoff_delay).await;
                        backoff_delay *= 2;
                        retries -= 1;
                    } else {
                        return Err(anyhow::anyhow!("Request received unexpected status code and ran out of retries"));
                    }
                }
                Err(e) => return Err(anyhow::anyhow!("Failed making request to webhook: {e}")),
            }
        }
    }
kflansburg commented 2 months ago

Ok, my bad, there is no window object in the Workers environment, but we can use wasm-bindgen to grab the setTimeout function, this appears to work for me:

#[wasm_bindgen]
extern "C" {
    #[wasm_bindgen(js_name="setTimeout")]
    fn set_timeout(cb: &js_sys::Function, delay: i32);
}

async fn sleep(delay: i32) {
    let mut cb = |resolve: js_sys::Function, _reject: js_sys::Function| {
        set_timeout(&resolve, delay);
    };

    let p = js_sys::Promise::new(&mut cb);

    wasm_bindgen_futures::JsFuture::from(p).await.unwrap_or_else(|e| {
        eprintln!("An error occurred awaiting JS future: {:?}", e);
        Default::default()
    });
}
Starttoaster commented 2 months ago

You're a legend. Apologies for not getting there myself, but I looked around for what felt like ages online with nobody seemingly trying to do exactly this (at least not using workers-rs.) But it seems like a useful enough thing to document, I assume I'm not the only one using workers as an intermediary between two web services that sometimes gets rate limited.

But yeah, with a sufficient number of retries (hate my chat app for this) it seems my messages are going through.