lunatic-solutions / lunatic-rs

This library contains higher level Rust wrappers for low level Lunatic syscalls.
275 stars 31 forks source link

Add a dynamic WorkerPool primitive #118

Open SquattingSocrates opened 1 year ago

SquattingSocrates commented 1 year ago

The current implementation of Supervisor relies on tuples to manage children. Even though this makes sense in order to support different types of children nodes it's limiting in that there's no way to dynamically scale the amount of children, which is a must in a modern web-based system. Therefore I'm suggesting the addition of a WorkerPool primitive which would take elements of one type and scale them based on a parameter/config. I imagine it would look like this:

impl WorkerPool<T>: Supervisor + Supervisable<?>
where
    Self: Sized,
    T: AbstractProcess
{
    type Arg: Serialize + DeserializeOwned;
    type Children: Supervisable<Self>;

    // not 100% sure on the api surface yet
    fn new(worker_count: usize, initializer: impl Fn() -> Children) -> Self;
    fn capacity(&self) -> usize; // get current capacity
    fn scale_by(&mut self, amount: i32); // scale children up or down
    fn get_worker(&self) -> WorkerType;
    fn release_worker(&self, worker: WorkerType);
}

This approach should allow to create a managed pool of workers while still having the ability to combine multiple different types of processes under one supervisor like this:

struct CoordinatorSup;
impl Supervisor for CoordinatorSup {
    type Arg = ();
    // Start a pool of `Counters` as well as one `Logger` and monitor them for failures.
    type Children = (WorkerPool<Counter>, Logger);

    fn init(config: &mut SupervisorConfig<Self>, _: ()) {
        // If a child fails, just restart it. Uses the same strategies as the regular `Supervisor`
        config.set_strategy(SupervisorStrategy::OneForOne);
        // Start each `Counter` with some state, don't know how this should look like
        config.children_args(
            ??? // pass arguments to list children, maybe with a sublist of a static function,
            (0, None) // regularly passing process config to `Logger`
        );
    }
}

Another thing that would be useful is for the pool to be able to receive the same messages that the children can and then forward them to the children. Shouldn't be that hard, although it creates some issues with the current approach to generating code in the macro abstract_process: Since in abstract_process we generate a new trait and implement the functions on the ProcessRef<P> we would need to do the same for the WorkerPool for better ergonomics. Not sure how to do that though. Maybe something like this:

struct WorkerPool<P> {
    list: lunatic::process::ProcessRef<P>,
}

impl<T, M> lunatic::process::RequestHandler<M> for WorkerPool<T>
where
    T: lunatic::process::RequestHandler<M>,
    M: Serialize + DeserializeOwned,
{
    type Response = T::Response;
    fn handle(state: &mut Self::State, request: M) -> Self::Response {
        state.list.next().request(request); // dummy code
    }
}