cloudflare / pingora

A library for building fast, reliable and evolvable network services.
Apache License 2.0
20.21k stars 1.1k forks source link

Ability to add and remove backends to a LoadBalancer #291

Open JosiahParry opened 1 week ago

JosiahParry commented 1 week ago

What is the problem your feature solves, or the need it fulfills?

A LoadBalancer contains (private) backends. These backends are static and cannot be updated through the LoadBalancer struct.

Request:

provide an interface for adding and removing backends from a LoadBalancer

Reasoning:

I have a service that is serving an (R) application e.g. localhost:9000. When demand increases, I want to spawn a new instance localhost:9001 and add that instance to the load balancer. When demand subsides, I would like to despawn and remove localhost:9001 from the LoadBalancer.

Describe the solution you'd like

A method or trait that allows me to add a Backend to a LoadBalancer struct.

Describe alternatives you've considered

I've tried utilizing the ServiceDiscovery trait with a custom struct that is based on the Static struct. However, I have not had success with this. There are Send issues and what things I have gotten to work has caused the reverse proxy to just hang. So my Rust and Pingora-fu have failed me

#[derive(Default, Debug)]
pub struct ShinyListeners {
    backends: Arc<RwLock<BTreeMap<Backend, Arc<Child>>>>,
}

impl ShinyListeners {
    /// return the collection to backends
    pub fn get(&self) -> BTreeMap<Backend, Arc<Child>> {
        let backends = self.backends.read().unwrap();
        backends.clone()
    }
}

#[async_trait::async_trait]
impl ServiceDiscovery for ShinyListeners {
    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
        let health = HashMap::new();
        let mut backends = BTreeSet::new();
        for (backend, _) in self.get().iter() {
            backends.insert(backend.clone());
        }
        Ok((backends, health))
    }
}

Additional context

This could include references to documentation or papers, prior art, screenshots, or benchmark results.

luizfonseca commented 1 week ago

Interesting enough, the Static struct has pub(crate) fns to add/remove backends: https://github.com/cloudflare/pingora/blob/a432c2da9b71a2cf1e5169cabc1205cc272c4c9c/pingora-load-balancing/src/discovery.rs#L81

So in theory you could copy this implementation (considering the data race) and create your own struct that is able to remove them:

let backends = ArcSwap::new(Arc::new(Backend {...})); 
let discovery = YourStaticStruct::new(Backends { backends });
let your_load_balancer = LoadBalancer::from_backends(discovery.backends);

// Your struct has a function to swap the backend on removal:
discovery.remove_backend(backend_to_remove); // This is probably a .store in a arcswap

discovery.add_backend(backend_to_add); // same here

your_load_balancer.update().now_or_never()

But you will either need to use ArcSwap or DashMap to be able to add or remove to cascade changes (or the rwlock if you can)

These are mostly theoretical, I haven't tested these ^ -- maybe someone directly from CF has more details.

JosiahParry commented 1 week ago

The suggest for a DashMap is quite helpful. I'm getting somewhere with it. I've tried implementing a Dynamic struct like so

#[derive(Clone, Debug)]
pub struct Dynamic {
    pub backends: DashSet<Backend>,
}

impl Dynamic {
    /// Create a new boxed [Dynamic] service discovery with the given backends.
    pub fn new(backends: DashSet<Backend>) -> Box<Self> {
        Box::new(Dynamic { backends })
    }

    pub fn get(&self) -> DashSet<Backend> {
        self.backends.clone()
    }

    pub fn add(&self, backend: Backend) {
        self.backends.insert(backend);
    }

    pub fn remove(&self, backend: &Backend) {
        self.backends.remove(backend);
    }
}

which I wanted to use alongside with my own wrapper struct

pub struct MyService {
    pub backends: Dynamic,
    pub load_balancer: LoadBalancer<RoundRobin>,
}

but the challenge is that since the load_balancer takes the Box<dyn ServiceDiscovery> so getting a struct to store the same Dynamic as that was consumed by the LoadBalancer doesn't seem easily feasible from my POV

For example I tried creating it like:

impl MyService { 
    pub fn new(entrypoint: &str) -> Self {
        // use Arc for cloning the Dynamic to retain after
       // being consumed by the 
        let arc_dynamic = Arc::new(Dynamic {
            backends: DashSet::new(),
        });
        let xx = arc_dynamic.as_ref().clone().to_owned();

        let dynamic = Box::new(arc_dynamic.as_ref().clone());
        let backends = Backends::new(dynamic);
        let lb = LoadBalancer::from_backends(backends);

        Self {
            backends: xx,
            load_balancer: lb,
        }      
}

But the issue here is that the LoadBalancer doesn't get updated when the Dynamic does e.g

// from reverse proxy trait impl
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        println!("Dynamic: {:?}", self.0.backends);
        println!(
            "Load Balancer: {:?}",
            self.0.load_balancer.backends().get_backend()
        );
// .... truncated 
// outputs: 
#> Dynamic: Dynamic { backends: {Backend { addr: Inet(0.0.0.0:35462), weight: 1 }: (), Backend { addr: Inet(0.0.0.0:42913), weight: 1 }: ()} }
#> Load Balancer: {}

For the time being, though, I can develop mimicking a random load balancing algorithm by grabbing a random backend from the Dynamic which otherwise works!

eaufavor commented 1 week ago

Sounds like you (almost) got it.

I would do something like this

struct Discovery(Arc<DashSet<Backend>>);
struct UpdateHandler(Arc<DashSet<Backend>>);

fn create() -> (Discovery, UpdateHandler) {
      let backends = Arc::new(DashSet::new());
      (Discovery(backends.clone()), UpdateHandler(backends))
}

impl ServiceDiscovery for Discovery {...}

Here Discovery and UpdateHandler share the same DashSet. So you can add/remove backends to the UpdateHandler. That update will be reflected in the Discovery and propagates to the LoadBalancer

JosiahParry commented 1 week ago

Thanks for the suggestion! I've adapted this approach and find that the UpdateHandler still does not update the LoadBalancer itself even after calling lb.update().await.

pub struct Discovery(Arc<DashMap<Backend, Child>>);
pub struct UpdateHandler(Arc<DashMap<Backend, Child>>);

fn create_handlers() -> (Discovery, UpdateHandler) {
    let instances = Arc::new(DashMap::new());
    let discovery = Discovery(instances.clone());
    let update_handler = UpdateHandler(instances);

    (discovery, update_handler)
}

#[async_trait::async_trait]
impl ServiceDiscovery for Discovery {
    async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
        // no readiness
        let health = HashMap::new();

        // initialize an empty BTreeSet
        let mut res = BTreeSet::new();

        // populate the BTreeSet
        for backend in self.0.iter() {
            res.insert(backend.key().clone());
        }

        Ok((res, health))
    }
}

The loadbalancer is created as


pub struct TestApp {
    pub loadbalancer: LoadBalancer<RoundRobin>,
    pub lb_handler: UpdateHandler,
}

impl TestApp {
    pub fn new() -> Self {
        let (discovery, lb_handler) = create_handlers();
        let loadbalancer = LoadBalancer::from_backends(Backends::new(Box::new(discovery)));

        Self {
            loadbalancer,
            lb_handler,
        }
    }

As far as i can tell, there's nothing quite missing here 🤔

github-actions[bot] commented 2 days ago

This question has been stale for a week. It will be closed in an additional day if not updated.

JosiahParry commented 1 day ago

As far as I know this is still not possible.