quickwit-oss / quickwit

Cloud-native search engine for observability. An open-source alternative to Datadog, Elasticsearch, Loki, and Tempo.
https://quickwit.io
Other
8.02k stars 328 forks source link

Change the algorithm that place split search on searcher #5010

Closed fulmicoton closed 3 months ago

fulmicoton commented 4 months ago

Right now we use the following logic.

pub async fn assign_jobs<J: Job>(
        &self,
        mut jobs: Vec<J>,
        excluded_addrs: &HashSet<SocketAddr>,
    ) -> anyhow::Result<impl Iterator<Item = (SearchServiceClient, Vec<J>)>> {
        let num_nodes = self.searcher_pool.len();

        let mut candidate_nodes: Vec<CandidateNodes> = self
            .searcher_pool
            .pairs()
            .into_iter()
            .filter(|(grpc_addr, _)| {
                excluded_addrs.is_empty()
                    || excluded_addrs.len() == num_nodes
                    || !excluded_addrs.contains(grpc_addr)
            })
            .map(|(grpc_addr, client)| CandidateNodes {
                grpc_addr,
                client,
                load: 0,
            })
            .collect();

        if candidate_nodes.is_empty() {
            bail!(
                "failed to assign search jobs. there are no available searcher nodes in the pool"
            );
        }
        jobs.sort_unstable_by(Job::compare_cost);

        let mut job_assignments: HashMap<SocketAddr, (SearchServiceClient, Vec<J>)> =
            HashMap::with_capacity(num_nodes);

        for job in jobs {
            sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id());
            // Select the least loaded node.
            let chosen_node_idx = if candidate_nodes.len() >= 2 {
                usize::from(candidate_nodes[0].load > candidate_nodes[1].load)
            } else {
                0
            };
            let chosen_node = &mut candidate_nodes[chosen_node_idx];
            chosen_node.load += job.cost();

            job_assignments
                .entry(chosen_node.grpc_addr)
                .or_insert_with(|| (chosen_node.client.clone(), Vec::new()))
                .1
                .push(job);
        }
        Ok(job_assignments.into_values())

This is rather suboptimal. I think we can come up with something considerably more efficient. For instance, @fmassot came up with the following logic:

let compute_target_load = total_load / num_searchers + SOME_MARGIN;
for job in jobs {
    sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id());
    let chosen_node_idx = candidates.iter().find(|candidate| candidate.load < compute_target_load).unwrap();
    let chosen_node = &mut candidate_nodes[chosen_node_idx];
    chosen_node.load += job.cost();
    job_assignments
       .entry(chosen_node.grpc_addr)
       .or_insert_with(|| (chosen_node.client.clone(), Vec::new()))
       .1
       .push(job);
}
PSeitz commented 4 months ago

For multi index search, we should try to group jobs for one index and use ideally one node. If the jobs are sorted by index, I think the current logic won't do that.

fulmicoton commented 4 months ago

silly question maybe... is there a benefit to grouping by index if all of the index share the same docmapper? Extra question: if they have a different docmapper does it makes a big difference?