Skepfyr / little-loadshedder

A Rust hyper/tower service that implements load shedding with queuing & concurrency limiting based on latency.
Apache License 2.0
143 stars 2 forks source link

Tonic Support / Example #2

Open jeffutter opened 5 months ago

jeffutter commented 5 months ago

Hi there. Thanks for this library. I've used similar algorithms in other languages/frameworks and find them incredibly useful.

I've spent a lot of time integrating this with a simple tonic project. I suspect it should be as simple as adding the layer and mapping the error to a tonic::Status - but I can't seem to get the type system to be happy. It always seems to think the inner type of the LoadShedError::Inner won't play nicely with a tonic::Status.

I'm probably missing something self-evident here, but it would be great to have an example of how to integrate this with tonic.

Thanks!

Skepfyr commented 5 months ago

Hmmmmmmm, this is more painful than I expected. I suspect the issue you're having is because of poll_ready. Specifically this code doesn't work:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    Server::builder()
        .layer(MapResultLayer::new(|resp| match resp {
            Ok(resp) => Ok(resp),
            Err(LoadShedError::Overload) => Err(Status::resource_exhausted("overloaded")),
            Err(LoadShedError::Inner(inner)) => Err(Status::internal(inner.to_string())),
        }))
        .layer(LoadShedLayer::new(0.1, Duration::from_millis(100)))
        .add_service(Greeter)
        .serve("0.0.0.0:50051".parse().unwrap())
        .await?;
    Ok(())
}

because even though the error is being adapted in the response case, it's not being adapted for a simple poll_ready. You can do something like this:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    Server::builder()
        .layer(layer_fn(|service| {
            let mut load_shed = LoadShed::new(service, 0.1, Duration::from_millis(100));
            service_fn(move |req| {
                let resp: BoxFuture<'static, _> = load_shed.call(req);
                async move {
                    match resp.await {
                        Ok(resp) => Ok(resp),
                        Err(LoadShedError::Overload) => {
                            Ok(Status::resource_exhausted("overloaded").to_http())
                        }
                        Err(LoadShedError::Inner(inner)) => Err(inner),
                    }
                }
            })
        }))
        .add_service(Greeter)
        .serve("0.0.0.0:50051".parse().unwrap())
        .await?;
    Ok(())
}

but that only works correctly because I know that the LoadShed service is always ready.

I think this has highlighted that I'm returning the wrong thing, I think the load shed service needs to return a response that might be overload rather than an error that might be overload. That design means that you can do this:

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    Server::builder()
        .layer(MapResponseLayer::new(|resp| match resp {
            LoadShedResponse::Inner(inner) => inner,
            LoadShedResponse::Overload => Status::resource_exhausted("overloaded").to_http(),
        }))
        .layer(LoadShedLayer::new(0.01, Duration::from_millis(200)))
        .add_service(Greeter)
        .serve("0.0.0.0:50051".parse().unwrap())
        .await?;
    Ok(())
}

Which looks much more sensible to me.

Skepfyr commented 5 months ago

There you go, hopefully that's better: https://github.com/Skepfyr/little-loadshedder/releases/tag/v0.2.0