apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.57k stars 198 forks source link

[Problem] How to deploy multiple schedulers on standalone mode but not docker #803

Open smallzhongfeng opened 1 year ago

smallzhongfeng commented 1 year ago

As the title.

smallzhongfeng commented 1 year ago

PTAL @thinkharderdev @andygrove @yahoNanJing

smallzhongfeng commented 1 year ago

Could we have some docs for this ? image

smallzhongfeng commented 1 year ago

Could someone give me some advices plz?

thinkharderdev commented 1 year ago

Hi @smallzhongfeng. Do you want to deploy ballista with multiple schedulers outside of kubernetes? Standalone mode does not currently support multiple schedulers as all it does is spins up the scheduler server in a tokio task with a hardcoded bind address of localhost:50050. But if you just want to deploy a cluster without kubernetes or docker then it is possible (although I don't know of anyone who has done it that way). All you would need is some sort of load balancer to sit in front of your schedulers and everything else should work.

smallzhongfeng commented 1 year ago

Thanks for your patient reply! Is there any yaml for deploying scheduler?

avantgardnerio commented 1 year ago

Is there any yaml

There is a helm chart, and an example kubernetes.yaml.

smallzhongfeng commented 1 year ago

So if I want to deploy multi-scheduler, should I change this ? Does this provide HA capabilities? @avantgardnerio

metadata:
  name: ballista-scheduler
spec:
  replicas: 2
smallzhongfeng commented 1 year ago

I just want to be able to deploy multiple schedulers to ensure high availability of the scheduler. @thinkharderdev

avantgardnerio commented 1 year ago

I have not deployed Ballista in a production environment, much less HA so I will let others share their experience.

My understanding is that as long as you set shared storage (etd, sled, etc) that is all that is required. (And a load balancer, as mentioned before - I think the helm chart did that)

thinkharderdev commented 1 year ago

I just want to be able to deploy multiple schedulers to ensure high availability of the scheduler. @thinkharderdev

So you have two options:

  1. Out of the box support for HA scheduler. As @avantgardnerio as long as you configure a storage backend that can be shared between the schedulers then this should work out of the box. There is already a storage backend implemented with etcd that you can use out of the box, but implementing a custom backend is relatively straightforward if you want to use some other DB or KV store. However the shared storage and distributed locking can add a significant amount of overhead.
  2. If you need high throughput on task scheduling then you can implement an API layer in front of the scheduler that can route calls to the correct scheduler and then have schedulers use only in-memory state. The API layer would need to know which scheduler "owns" each query and route status requests to the correct scheduler.

Option 2 is what we have done in our deployment. We have multiple schedulers, each using an in-memory JobState and an API layer in front which routes calls to the appropriate scheduler. We also use a shared ClusterState based on redis (not yet upstreamed but it is relatively straightforward to implement). This gives all the schedulers a consistent view of the executor task slots and with a little bit of redis server-side scripting doesn't require any distributed locks.

One downside of this approach is that the job state is volatile so if a scheduler dies then all jobs running on it are lost. If you are running relatively short-duration queries then this is not a huge issue (at least for us) since the scheduler will try and complete any in-flight jobs before it shuts down so you can set up your deployment such that the schedulers have a shutdown grace period sufficient to complete any outstanding work.

smallzhongfeng commented 1 year ago

Thanks for your patient reply. I got it. 👍 But I still have a question. If we use the push policy, will the task fail when the scheduler switches? Or will the running tasks before the switch fail? @thinkharderdev

thinkharderdev commented 1 year ago

If we use the push policy, will the task fail when the scheduler switches? Or will the running tasks before the switch fail? @thinkharderdev

Yes, currently the active job state is stored in memory on the scheduler and if the scheduler shuts down/restarts before the job completes then the job will fail. The scheduler will try and wait for all it's active jobs to complete once it receives a SIGTERM and before it terminates but depending on how long running the job is then that may not be feasible.

smallzhongfeng commented 1 year ago

Thanks for your reply again! Last question. Can we extract the task scheduling part of the scheduler and make it a thread like the spark driver, so that when the scheduler hangs up, it will not affect the execution of the task.

smallzhongfeng commented 1 year ago
local:
INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 588.435µs 
etcd:
INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 77.631762ms

These days, I am testing the performance of etcd as a storage, but I found that the performance is very poor, especially when applying for and releasing resources. The time spent here is much longer than the local storage mode. Here I suspect this is the global distributed lock. Cause, do you have any suggestions here? @thinkharderdev @avantgardnerio

yahoNanJing commented 1 year ago

Hi @smallzhongfeng, could you explain the reason of using multiple schedulers? Is it just for HA or worried about the performance of single scheduler for task scheduling? If for HA, is it acceptable that all jobs scheduled on a scheduler fail when a scheduler is down?

thinkharderdev commented 1 year ago
local:
INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 588.435µs 
etcd:
INFO tokio-runtime-worker ThreadId(02) ballista_scheduler::state::executor_manager: Reserved 0 executor slots in 77.631762ms

These days, I am testing the performance of etcd as a storage, but I found that the performance is very poor, especially when applying for and releasing resources. The time spent here is much longer than the local storage mode. Here I suspect this is the global distributed lock. Cause, do you have any suggestions here? @thinkharderdev @avantgardnerio

Yes, we found the same with etcd. I would suggest that if you don't need HA then use a single scheduler with in-memory state. If you need HA, what we did is implement ClusterState backed by redis. Haven't found the time to upstream the implementation yet but it's relatively simple. I will try to find some time to do that soon but the gist is that you use an hmap for holding the free task slots (so a map executor_id -> task_slots) and then a lua script for atomic reservation/freeing. Something roughly like:

const RESERVATION_SCRIPT: &str = r#"
local desired_slots = tonumber(ARGV[1])
local s = {}
for i=2, #ARGV do
    local exists = redis.call('HEXISTS', KEYS[1], ARGV[i])
    if( exists == 1 ) then
        local value = redis.call('HGET', KEYS[1], ARGV[i])
        local slots = tonumber(value)
        if( slots >= desired_slots ) then
            s[i - 1] = desired_slots
            local inc = -desired_slots
            redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
            desired_slots = 0
        elseif slots == 0 then
            s[i - 1] = 0
        else
            s[i - 1] = slots
            local inc = -slots
            redis.call('HINCRBY', KEYS[1], ARGV[i], inc)
            desired_slots = desired_slots - slots
        end
    else
        s[i - 1] = 0
    end

    if( desired_slots <= 0 ) then
        break
    end
end
return cjson.encode(s)
"#;

const CANCEL_SCRIPT: &str = r#"
local cancelled = 0
for i=2, #KEYS do
    local exists = redis.call('HEXISTS', KEYS[1], KEYS[i])
    if( exists == 1 ) then
        local inc = tonumber(ARGV[i - 1])
        redis.call('HINCRBY', KEYS[1], KEYS[i], inc)
        cancelled = cancelled + inc
    end
end
return cancelled
"#;

const SLOTS_KEY: &str = "task-slots";

impl ClusterState for MyRedisState {
    async fn reserve_slots(
        &self,
        num_slots: u32,
        _distribution: TaskDistribution,
        executors: Option<HashSet<String>>,
    ) -> Result<Vec<ExecutorReservation>> {
        if num_slots == 0 {
            return Ok(vec![]);
        }

        if let Some(executors) = executors {
            let mut connection = self.get_connection().await?;

            let script = Script::new(RESERVATION_SCRIPT);

            let mut script = script.key(SLOTS_KEY);
            script.arg(num_slots);

            if !executors.is_empty() {
                let executor_ids: Vec<String> = executors.into_iter().collect();
                for executor_id in &executor_ids {
                    script.arg(executor_id);
                }

                let result: String = match script.invoke_async(&mut connection).await {
                    Ok(result) => result,
                    Err(e) => {
                        return Err(into_ballista_error(e));
                    }
                };

                let reservations = serde_json::from_str::<Vec<u32>>(&result).map_err(|e| {
                    BallistaError::General(format!(
                        "Error executing reservations, unexpected response from redis: {e:?}"
                    ))
                })?;

                let reservations: Vec<ExecutorReservation> = executor_ids
                    .into_iter()
                    .zip(reservations)
                    .flat_map(|(id, reserved)| {
                        (0..reserved).map(move |_| ExecutorReservation::new_free(id.clone()))
                    })
                    .collect();

                return Ok(reservations);
            }
        }

        Ok(vec![])
    }

    async fn cancel_reservations(&self, reservations: Vec<ExecutorReservation>) -> Result<()> {
        let mut connection = self.get_connection().await?;

        if !reservations.is_empty() {
            let script = Script::new(CANCEL_SCRIPT);
            let mut script = script.key(SLOTS_KEY);

            let reservations = reservations
                .into_iter()
                .group_by(|r| r.executor_id.clone())
                .into_iter()
                .map(|(key, group)| (key, group.count()))
                .collect::<HashMap<String, usize>>();

            for (executor_id, slots) in reservations {
                script.key(executor_id);
                script.arg(slots);
            }

            let cancelled: u64 = match script.invoke_async(&mut connection).await {
                Ok(cancelled) => {
                    cancelled
                }
                Err(e) => {
                    return Err(into_ballista_error(e));
                }
            };

            debug!("Cancelled {} reservations", cancelled);

            Ok(())
        } else {
            Ok(())
        }
    }
}

Note that this only supports TaskDistribution::Bias and for round-robin task distribution you would need a different lua script (which we have not implemented) but it could be done in principle.

smallzhongfeng commented 1 year ago

could you explain the reason of using multiple schedulers? Is it just for HA or worried about the performance of single scheduler for task scheduling?

Hi @yahoNanJing At present, we intend to support ha

If for HA, is it acceptable that all jobs scheduled on a scheduler fail when a scheduler is down?

We cannot accept the failure of all the jobs in the switching process, we are supporting the retry of the jobs in the switching process