thruster-rs / Thruster

A fast, middleware based, web framework written in Rust
MIT License
1.06k stars 47 forks source link

external middleware (plugins/libraries) #248

Open Tronikelis opened 1 year ago

Tronikelis commented 1 year ago

Hi, so I wanted to try writing a couple middlewares for thruster, but I am hitting a road block where some custom config is needed for the middleware.

For example, the rate limit middleware:

use async_trait::async_trait;
use redis::{aio::Connection, AsyncCommands, RedisError, RedisResult};
use thruster::{middleware_fn, Context, MiddlewareNext, MiddlewareResult};

pub struct RateLimiter<T: Store> {
    pub max: usize,
    pub per_ms: usize,
    pub store: T,
}

#[async_trait]
pub trait Store {
    type Error;
    async fn get(&mut self, key: &str) -> Result<Option<usize>, Self::Error>;
    async fn set(&mut self, key: &str, value: usize, expiry_ms: usize) -> Result<(), Self::Error>;
}

pub struct RedisStore {
    url: String,
    connection: Connection,
}

impl RedisStore {
    pub async fn new(url: String) -> RedisResult<Self> {
        let client = redis::Client::open(url.as_str())?;
        let connection = client.get_async_connection().await?;
        return Ok(Self { connection, url });
    }
}

#[async_trait]
impl Store for RedisStore {
    type Error = RedisError;

    async fn get(&mut self, key: &str) -> Result<Option<usize>, Self::Error> {
        let current: Option<usize> = self.connection.get(key).await?;
        return Ok(current);
    }

    async fn set(&mut self, key: &str, value: usize, expiry_ms: usize) -> Result<(), Self::Error> {
        let _: () = self.connection.set_ex(key, value, expiry_ms).await?;
        return Ok(());
    }
}

#[middleware_fn]
pub async fn rate_limiter_middleware<T: 'static + Context + Store + Send>(
    mut context: T,
    next: MiddlewareNext<T>,
) -> MiddlewareResult<T> {
    // how do I get the RateLimiter struct data here and write logic?
    // I would need middleware with pre-defined state for it, not global server state 🤔

    todo!("?");
}

How would I get the RateLimiter struct into the fn rate_limiter_middleware ?

Ideally there would be something like this:

#[tokio::main]
async fn main() {
    let rate_limiter = RateLimiter {
        max: 100,
        per_ms: 1000,
        store: RedisStore::new("redis://127.0.0.1".to_string())
            .await
            .unwrap(),
    };

    let app = App::<HyperRequest, ReqContext, ServerState>::create(
        init_context,
        ServerState::new().await,
    )
    // I am thinking of something similar to this
    .middleware_with_state("/", rate_limiter, rate_limiter_middleware)
    .get("/", m![root]);

    let server = HyperServer::new(app);
    server.build("127.0.0.1", 3000).await;
}

I am learning rust so can't say whether this would be a simple and scalable approach, but let me know if there is a way to pass config to middlewares 👍

trezm commented 1 year ago

The way I prefer when building a thruster app (in a generalizable way rather than a one-off) is to use a trait-based system. For example, a trait based system for simply having a reference to a postgres connection pool might look something like:

pub type Ctx = TypedHyperContext<RequestConfig>;

pub trait HasDb {
    fn get_db(&self) -> Arc<Client>;
}

impl HasDb for Ctx {
    fn get_db(&self) -> Arc<Client> {
        self.extra
            .db
            .clone()
            .expect("Attempted to access db on a default context")
    }
}

pub struct ServerConfig {
    pub db: Arc<Client>,
}

#[derive(Clone, Default)]
pub struct RequestConfig {
    pub db: Option<Arc<Client>>,
}

fn generate_context(request: HyperRequest, state: &ServerConfig, _path: &str) -> Ctx {
    Ctx::new(
        request,
        RequestConfig {
            db: Some(state.db.clone()),
        },
    )
}

...

// initialization
let mut app = App::<HyperRequest, Ctx, ServerConfig>::create(
        generate_context,
        ServerConfig {
            db: Arc::new(client),
        },
    );

The idea here is that we set up two important data structures that are used in the app:

  1. A server level config
  2. A request level config

2 is generated for every new request that comes in. In the example above it's in the generate_context function. Request level configs are stored in this example using the library-provided TypedHyperContext version of a context -- but really all this is is a BasicHyperContext with an additional generic field. We use that generic field to store state for a request!

The nice thing about this is that we can now effectively pass state from the server to each request, and therefore any middleware, so that we can use the global state of the server to access resources, as well as pass information between handlers. This is especially useful for things like authentication, as it means you can have one middleware function that "authenticates" based on a header, and then another that uses the authenticated user on the context to make a db query.

Does that make a little more sense?

Tronikelis commented 1 year ago

This approach is perfect for your own server state, but it is I guess "polluting" the global server state, what I had in my mind was to build a middleware and then publish it on crates.io, so that all I care about are some settings that I can configure, in my case the max and per_ms for the rate limiter. Then I could just pass those settings somehow to the middleware and not touch or implement anything on my own server state.

This would be something similar to the axum's from_fn_with_state(state.clone(), my_middleware)

trezm commented 1 year ago

If my read of their documentation is correct, then Axum actually does the same thing -- with_state sets a global state for the whole router. That's the same as thruster, except that they allow it to be overwritten (with multiple calls to with_state) whereas thruster requires it to be static.

I could be wrong though -- if you have an example feel free to share and I'll take a look at how Axum does it!

Tronikelis commented 1 year ago

hmm, so if I wanted to download a custom middleware from crates.io and it has configuration options, I would need to implement something on my current server state?

But implementations are methods, so how would the middleware consume the configuration options that I would provide which are not functions but just simple variables?

EDIT: but downloading a middleware library and implementing a trait just seems weird to me, I download said library because I don't want to implement stuff myself.

Israel-Laguan commented 1 year ago

So the problem is to prepare thruster to accept external middleware (plugins? libraries?) instead of adding functionalities to the framework?

I think a good approach would be adding this functionality to the core now, then preparing the way for external middleware later as a feature.

Then later users can use the "default" middleware, or plug in an external "library" as they wish

Tronikelis commented 1 year ago

So the problem is to prepare thruster to accept external middleware (plugins? libraries?)

Yeah, thats what I had in mind

trezm commented 1 year ago

downloading a middleware library and implementing a trait just seems weird to me, I download said library because I don't want to implement stuff myself.

"Implementing the trait" here is essentially just creating an accessor for a field on your server state, so it doesn't seem like much of a burden to me. That being said...

The idea of having isolated server state for to allow for this easy drag-n-drop type of middleware is extremely appealing. I'll have to do some thinking about how this would work at a high level without requiring some sort of globally scoped mutex type situation. We could easily throw in something like Jab, however I'd be worried that we would see a non-insignficant performance hit. Ideally my list for this feature would be:

Must Haves:

  1. "Drop in" middleware modules, i.e. they can contain their own isolated state so the developer doesn't have to do anything but include a "middleware" call
  2. "Opt-in" if performance is hampered

Nice to Haves:

  1. Strong typing/Compile time checks
  2. No performance hits

1 and 2 are solved by Jab in conjunction with TypedHyperContext<JabDI>, however, 3 is certainly not, and 4 is iffy since it's an additional dynamic dispatch per request.

trezm commented 1 year ago

Welp, I'm shocked at the results of my perf tests. TLDR; Using Jab is actually faster than using the static server config! With that in mind, I'll put together a pattern for making external middleware libraries easy to use and introduce.

https://github.com/trezm/thruster-jab-perf-test/blob/main/RESULTS.md

Tronikelis commented 1 year ago

@trezm any progress on this? I'm pretty excited about it

trezm commented 1 year ago

I've been experimenting with a few different versions, the issue I'm working through specifically is around having the shared singletons via JabDI, but having a state shared somehow to each request as an instance rather than a singleton.

This sets up the following situation:

I'm open to hear thoughts on potential solutions for the second case above, but that's where I'm stuck at the moment.

Israel-Laguan commented 1 year ago

I tried to use rate limiters before and found the best way to approach that is to handle the limiting upside to HTTP requests aware apps or services (for example NginX, k8, API Gateway, AWS WAF, or similar), instead of doing it inside the server, for the implications @trezm mentioned.

Regarding the middleware instance, I think the "main" function should store the state or handle responsibilities regarding "what to save or who to block", while the library/middleware only provides a callback/function to handle the logic of "what to block and in base to what rules".

Tronikelis commented 1 year ago

A customer authentication setup is hard -- there needs to be some way of storing authentication per request or per context. That requires either each context to have a second, unique DI pool, or to give the singleton a mutex-equivalent way to store and reference auth on a context. The former is slow because we'll need to instantiate new pools per request. The latter will potentially lock up a server because of the concurrency requirements.

So customer authentication is just some custom shared state right?, maybe the customer could implement a trait from the rate limiter library on his context that has for example the function should_rate_limit(his_context_here) ? that by default would always return true.

Or am I not getting something?

trezm commented 1 year ago

Alright, I've added a very useful proc macro along with accessor functions: https://github.com/thruster-rs/Thruster/blob/master/thruster/examples/custom_middleware_with_auth.rs.

What's this look like as a few snippets?

type Ctx = TypedHyperContext<State>;

context_state!(State => [Arc<JabDI>, Option<User>]);

#[derive(Default)]
struct ServerConfig {
    di: Arc<JabDI>,
}

fn generate_context(request: HyperRequest, state: &ServerConfig, _path: &str) -> Ctx {
    Ctx::new(request, (state.di.clone(), None))
}

And then some authentication middleware:

#[middleware_fn]
async fn authenticate(mut context: Ctx, next: MiddlewareNext<Ctx>) -> MiddlewareResult<Ctx> {
    let di: &Arc<JabDI> = context.extra.get();
    let auth = fetch!(di, dyn AuthProvider);

    let session_header = context
        .get_header("Session-Token")
        .pop()
        .unwrap_or_default();

    let user = auth
        .authenticate(&session_header)
        .await
        .map_err(|_| ThrusterError::unauthorized_error(Ctx::default()))?;

    *context.extra.get_mut() = Some(user);

    next(context).await
}

With this, if a developer needed to add an external module that has request-level state, they'd simply need to include the relevant struct in the context_state! macro call.

Tronikelis commented 1 year ago

Looks awesome, will play around with this soon and check if I have any suggestions

Tronikelis commented 1 year ago

Okay, so I almost have created a fully external middleware, the only thing that's left is that I had to import the context_state! trait to constraint the context.

main.rs

type Context = TypedHyperContext<RequestState>;

context_state!(RequestState => [Arc<JabDI>, RateLimiter<RedisStore>]);

struct ServerState {
    jab: Arc<JabDI>,
    rate_limiter_store: Arc<Mutex<RedisStore>>,
}

fn init_context(request: HyperRequest, state: &ServerState, _path: &str) -> Context {
    let rate_limiter = RateLimiter {
        max: 10,
        per_ms: 10_000,
        store: state.rate_limiter_store.clone(),
    };

    return Context::new(request, (Arc::clone(&state.jab), rate_limiter));
}

#[middleware_fn]
async fn root(mut context: Context, _next: MiddlewareNext<Context>) -> MiddlewareResult<Context> {
    context.body("hi");

    return Ok(context);
}

#[tokio::main]
async fn main() {
    let rate_limiter_store = Arc::new(Mutex::new(
        RedisStore::new("redis://127.0.0.1".to_string())
            .await
            .unwrap(),
    ));

    let app = App::<HyperRequest, Context, ServerState>::create(
        init_context,
        ServerState {
            jab: Arc::new(JabDI::default()),
            rate_limiter_store,
        },
    )
    .middleware("/", m![rate_limit_middleware])
    .get("/", m![root]);

    let server = HyperServer::new(app);
    server.build("127.0.0.1", 3000).await;
}

middleware::rate_limit.rs

pub struct RateLimiter<T: Store> {
    pub max: usize,
    pub per_ms: usize,
    pub store: Arc<Mutex<T>>,
}

// --- snip ---

// here: RequestStateGetField<RateLimiter<G>> needed
#[middleware_fn]
pub async fn rate_limit_middleware<
    T: Send + RequestStateGetField<RateLimiter<G>>,
    G: 'static + Store + Send + Sync,
>(
    mut context: TypedHyperContext<T>,
    next: MiddlewareNext<TypedHyperContext<T>>,
) -> MiddlewareResult<TypedHyperContext<T>> {
    let rate_limiter: &RateLimiter<G> = context.extra.get();

    let RateLimiter { store, max, per_ms } = rate_limiter;

    let store = Arc::clone(&store);
    let mut store = store.lock().await;

    let key = "rate_limit:".to_string()
        + &context
            .hyper_request
            .as_ref()
            .unwrap()
            .ip
            .unwrap()
            .to_string();

    let current_count: Option<usize> = store.get(&key).await.unwrap();

    let current_count = current_count.unwrap_or(0);
    let new_count = current_count + 1;

    if new_count > *max {
        context.status(429);
        return Err(ThrusterError {
            cause: None,
            context,
            message: "Rate limit exceeded".to_string(),
        });
    }

    let _: () = store.set(&key, new_count, *per_ms).await.unwrap();

    return next(context).await;
}

Currently I can't remove this, this won't be available if this would be an external lib

T: RequestStateGetField<RateLimiter<G>>

I import the RequestStateGetField from the context_state! macro, but this won't be available if this would be a standalone library, maybe thruster should export this trait somehow?

And now I am kind of confused about the standalone JabDI and context_state!, if I use context_state! I don't need jab?

EDIT: the full code from my examples is here if needed: https://github.com/Tronikelis/thruster-middleware-try-1

trezm commented 1 year ago

Alright, I made some updates accordingly, check out the same example. Essentially, there's now a ContextState trait that's auto implemented, and the context_state macro is now an attribute for a named tuple.

In the future, I'll consider expanding support from just named tuples to full structs, but this is a good starting point I think.

Tronikelis commented 1 year ago

Okay, this is cool and now I can actually create fully external middleware.

But I am stuck on a certain feature that I want to add to my rate limit middleware, I want the user to be able to implement some functions:

https://github.com/Tronikelis/thruster-rate-limit/blob/b5450675f50d0cb6b852edd1548e53bb03e744c9/src/lib.rs#L16-L19

pub trait Configuration<T: Send> {
    fn should_limit(&self, context: &TypedHyperContext<T>) -> bool;
    fn get_key(&self, context: &TypedHyperContext<T>) -> String;
}

Here I don't know how to constraint the RateLimiter<G> to have this Configuration trait:

https://github.com/Tronikelis/thruster-rate-limit/blob/b5450675f50d0cb6b852edd1548e53bb03e744c9/src/lib.rs#L22-L28

#[middleware_fn]
pub async fn rate_limit_middleware<
    T: Send + ContextState<RateLimiter<G>>,
    G: 'static + Store + Send + Sync + Clone,
>(
    mut context: TypedHyperContext<T>,
    next: MiddlewareNext<TypedHyperContext<T>>,
) -> MiddlewareResult<TypedHyperContext<T>> 

I tried using thruster_jab:

#[middleware_fn]
pub async fn rate_limit_middleware<
    T: Send + ContextState<RateLimiter<G>> + ContextState<Arc<JabDI>>,
    G: 'static + Store + Send + Sync + Clone,
>(
    mut context: TypedHyperContext<T>,
    next: MiddlewareNext<TypedHyperContext<T>>,
) -> MiddlewareResult<TypedHyperContext<T>> {
    // added these 2 lines
    let di: &Arc<JabDI> = context.extra.get();
    let configuration = fetch!(di.as_ref(), dyn Configuration<T>);

    let rate_limiter: &RateLimiter<G> = context.extra.get_mut();
    let RateLimiter {
        mut store,
        max,
        per_s,
    } = rate_limiter.clone();

    let key = "rate_limit:".to_string()
        + &context
            .hyper_request
            .as_ref()
            .unwrap()
            .ip
            .unwrap()
            .to_string();

    let current_count: Option<usize> = store.get(&key).await.unwrap();

    let current_count = current_count.unwrap_or(0);
    let new_count = current_count + 1;

    if new_count > max {
        context.status(429);
        return Err(ThrusterError {
            cause: None,
            context,
            message: format!("Rate limit exceeded, please wait {} seconds", per_s),
        });
    }

    store.set(&key, new_count, per_s).await.unwrap();

    return next(context).await;
}

But got this err:

error: future cannot be sent between threads safely
  --> src\lib.rs:23:1
   |
23 | #[middleware_fn]
   | ^^^^^^^^^^^^^^^^ future returned by `__async_rate_limit_middleware` is not `Send`
   |
   = help: the trait `Sync` is not implemented for `dyn Configuration<T>`
note: future is not `Send` as this value is used across an await
  --> src\lib.rs:50:55
   |
32 |     let configuration = fetch!(di.as_ref(), dyn Configuration<T>);
   |         ------------- has type `&Box<dyn Configuration<T>>` which is not `Send`
...
50 |     let current_count: Option<usize> = store.get(&key).await.unwrap();
   |                                                       ^^^^^^ await occurs here, with `configuration` maybe used later
...
67 | }
   | - `configuration` is later dropped here
note: required by a bound in `ReusableBoxFuture::<T>::new`
  --> C:\Users\bdona\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-util-0.6.10\src\sync\reusable_box.rs:21:33
   |
21 |         F: Future<Output = T> + Send + 'static,
   |                                 ^^^^ required by this bound in `ReusableBoxFuture::<T>::new`
   = note: this error originates in the attribute macro `middleware_fn` (in Nightly builds, run with -Z macro-backtrace for more info)

Idk, maybe I am thinking about this from the wrong way?

Full code at https://github.com/Tronikelis/thruster-rate-limit

trezm commented 1 year ago

Odd -- what version of rust are you compiling with? I just pulled your repo and was able to build it without issue. I'm on 1.68

Tronikelis commented 1 year ago

@trezm If you're talking about jab, then the repo does not have the non-compiling code, but its a really simple change, please add these 2 lines at the top of the middleware function from my comment:

// added these 2 lines
let di: &Arc<JabDI> = context.extra.get();
let configuration = fetch!(di.as_ref(), dyn Configuration<T>);

And the type constraint:

T: Send + ContextState<RateLimiter<G>> + ContextState<Arc<JabDI>>

Then you should get that error

Im on 1.69

Or checkout the with-jab branch from my repo: https://github.com/Tronikelis/thruster-rate-limit/tree/with-jab

trezm commented 1 year ago

So, it's complaining because fetch! returns a reference, which you've stored a reference to in configuration. The compiler is complaining because it needs to be Sync in this case (because one thread might own the reference while another thread executes the continuation.)

There are two ways you can handle this:

  1. Directly add Sync to the fetch call (i.e. fetch!(di.as_ref(), dyn Configuration<T> + Sync))
  2. Do not store a reference to to configuration that crosses an await. That could mean putting the call and subsequent calls in their own block, or it could mean doing a one-line to get the key like this:
    let key = fetch!(di.as_ref(), dyn Configuration<T>).get_key(&context);

I also realized I never addressed one of your earlier questions:

And now I am kind of confused about the standalone JabDI and context_state!, if I use context_state! I don't need jab?

JabDI is most helpful when you need to be able to dynamically switch a dependency based on a runtime configuration. I most frequently use it when I have an external service that shouldn't be called or should return dummy values for testing. Another reason is that perhaps you have a local development environment and you want to simply print the contents of an email, and then in a production environment, you want to make calls out to SendGrid or Mailgun. You could have a LocalSender and a MailgunSender and switch them according to which env var the server starts with.

In your case, I'm not sure you need it. I think your code would work if you make Configuration a struct rather than a trait, and included it in the context state. In general, my rule of thumb is: if the resource needs to change based on testing or env, use JabDI, otherwise use a static type (now we have context_state! for static types! Neat!)

Tronikelis commented 1 year ago

In your case, I'm not sure you need it. I think your code would work if you make Configuration a struct rather than a trait, and included it in the context state.

Hmm, I'm not quite sure how to translate that into correct rust, as I would prefer the configuration to have a generic on the request context so the user could use all his context.

I tried doing it like this, but obviously this is wrong, because types here shouldn't be recursive:

#[middleware_fn]
pub async fn rate_limit_middleware<
    C: Send
        + ContextState<RateLimiter<S>>
        + ContextState<Arc<JabDI>>

        + ContextState<dyn Configuration<C>>, // <- here

    S: 'static + Store + Send + Sync + Clone,
>(
    mut context: TypedHyperContext<C>,
    next: MiddlewareNext<TypedHyperContext<C>>,
) -> MiddlewareResult<TypedHyperContext<C>> {}
 + ContextState<dyn Configuration<C>>, // <- here

The trait

pub trait Configuration<C: Send> {
    fn should_limit(&self, _context: &TypedHyperContext<C>) -> bool {
        return true;
    }
    fn get_key(&self, context: &TypedHyperContext<C>) -> String {
        if_chain! {
            if let Some(request) = context.hyper_request.as_ref();
            if let Some(ip) = request.ip;
            then {
                return ip.to_string();
            }
        }

        return "".to_string();
    }
}

I want Configuration to be implemented with the user's own context, but I don't know how to translate that into rust.

With jab everything's fine and it compiles, but I would like as much compile-time type safety as possible.

Thanks btw

trezm commented 1 year ago

Ah, you can't just store dyn traits, you need to Box them first! Your code would look like this:

#[middleware_fn]
pub async fn rate_limit_middleware<
    T: Send + ContextState<RateLimiter<G>> + ContextState<Box<dyn Configuration<TypedHyperContext<T>>>>,
    G: 'static + Store + Send + Sync + Clone,
>(
    mut context: TypedHyperContext<T>,
    next: MiddlewareNext<TypedHyperContext<T>>,
) -> MiddlewareResult<TypedHyperContext<T>> {

    // Other code, w/e

    // This lives in a block so you don't need the extra Send and Sync traits
    let key = {
        let configuration: &Box<dyn Configuration<TypedHyperContext<T>>> = context.extra.get();

        configuration.get_key(&context)
    };
Tronikelis commented 1 year ago

Okay, so I kind of settled with the apporach of having 2 structs, one is 0 sized for the user to implement methods that change middleware's behavior, the other struct is all the data that is needed for the middleware:

#[derive(Clone)]
pub struct RateLimiter<S: Store + Clone + Sync> {
    pub max: usize,
    pub per_s: usize,
    pub store: S,
}

pub trait Configuration<State: Send> {
    fn should_limit(&self, _context: &TypedHyperContext<State>) -> bool {
        return true;
    }
    fn get_key(&self, context: &TypedHyperContext<State>) -> String {
        if let Some(request) = context.hyper_request.as_ref() {
            if let Some(ip) = request.ip {
                return ip.to_string();
            }
        }

        return "".to_string();
    }
}

#[middleware_fn]
pub async fn rate_limit_middleware<
    C: Send + Sync + ContextState<RateLimiter<S>> + ContextState<Box<Conf>>,
    S: 'static + Store + Send + Sync + Clone,
    Conf: 'static + Configuration<C> + Sync,
>(
    mut context: TypedHyperContext<C>,
    next: MiddlewareNext<TypedHyperContext<C>>,
) -> MiddlewareResult<TypedHyperContext<C>> {
    // snip
}

My request state types:

struct ServerState {
    rate_limiter: RateLimiter<MapStore>,
}

#[context_state]
struct RequestState(RateLimiter<MapStore>, Box<RateLimiterConf>);
type Ctx = TypedHyperContext<RequestState>;

struct RateLimiterConf;
impl Configuration<RequestState> for RateLimiterConf {}
Tronikelis commented 1 year ago

Quick update: I've created a simple crate for this middleware https://crates.io/crates/thruster-rate-limit 👍

trezm commented 1 year ago

Amazing, thank you!!! I'm going to keep this open until I have time to review and then add it to the readme!

Tronikelis commented 1 year ago

Then I have a question about custom backends, it is tricky when writing an external middleware, because different contexts have different structures, I am currently only using the hyper backend. But maybe there should be only 1 backend? It would simplify writing middlewares and the codebase as well.

I know that different backends are a unique feature of thruster, but it brings confusion, at least to me.

Idk, I think that thruster should just wrap hyper or actix and try to be the best at one, but I could totally be wrong here