neon-mmd / websurfx

:rocket: An open source alternative to searx which provides a modern-looking :sparkles:, lightning-fast :zap:, privacy respecting :disguised_face:, secure :lock: meta search engine
https://github.com/neon-mmd/websurfx/tree/rolling/docs
GNU Affero General Public License v3.0
721 stars 94 forks source link

Improve async multithreading for aggregation #81

Closed neon-mmd closed 1 year ago

neon-mmd commented 1 year ago

Currently, the source code does fetch results from upstream engines concurrently but it does not take into account if more engines are added in the future and also it doesn't take into account limited resources, So by using tokio::spawn add code to run asynchronous code in parallel.

Here is the code snippet that needs attention it is located in the file src/search_results_handler/aggregator.rs:

// --- code above ---

    // Add a random delay before making the request.
    let mut rng = rand::thread_rng();
    let delay_secs = rng.gen_range(1..10);
    std::thread::sleep(Duration::from_secs(delay_secs));

    // fetch results from upstream search engines simultaneously/concurrently.
    let (ddg_map_results, searx_map_results) = join!(
        duckduckgo::results(query, page, &user_agent),
        searx::results(query, page, &user_agent)
    );

    let ddg_map_results: HashMap<String, RawSearchResult> = ddg_map_results?;
    let searx_map_results: HashMap<String, RawSearchResult> = searx_map_results?;

//  --- code below ---
neon-mmd commented 1 year ago

It seems that the implementation with tokio::spawn doesn't work as it requires to implement the Send and Sync traits.

Unfortunately, The current implementation with tokio::spawn given below is invalid:

//! This module provides the functionality to scrape and gathers all the results from the upstream
//! search engines and then removes duplicate results.

use std::{collections::HashMap, rc::Rc, time::Duration, vec};

use rand::Rng;

use super::{
    aggregation_models::{RawSearchResult, SearchResult, SearchResults},
    user_agent::{self, random_user_agent},
};

use crate::engines::{duckduckgo, engine_models::SearchEngine, searx};

/// A function that aggregates all the scraped results from the above upstream engines and
/// then removes duplicate results and if two results are found to be from two or more engines
/// then puts their names together to show the results are fetched from these upstream engines
/// and then removes all data from the HashMap and puts into a struct of all results aggregated
/// into a vector and also adds the query used into the struct this is neccessory because
/// otherwise the search bar in search remains empty if searched from the query url
///
/// # Example:
///
/// If you search from the url like `https://127.0.0.1/search?q=huston` then the search bar should
/// contain the word huston and not remain empty.
///
/// # Arguments
///
/// * `query` - Accepts a string to query with the above upstream search engines.
/// * `page` - Accepts an u32 page number.
/// * `random_delay` - Accepts a boolean value to add a random delay before making the request.
///
/// # Error
///
/// Returns an error a reqwest and scraping selector errors if any error occurs in the results
/// function in either `searx` or `duckduckgo` or both otherwise returns a `SearchResults struct`
/// containing appropriate values.
pub async fn aggregate(
    query: &str,
    page: u32,
    random_delay: bool,
    debug: bool,
) -> Result<SearchResults, Box<dyn std::error::Error>> {
    let user_agent: String = random_user_agent();
    let mut result_map: HashMap<String, RawSearchResult> = HashMap::new();

    // Add a random delay before making the request.
    if random_delay || !debug {
        let mut rng = rand::thread_rng();
        let delay_secs = rng.gen_range(1..10);
        std::thread::sleep(Duration::from_secs(delay_secs));
    }

    let engines: Vec<Box<dyn SearchEngine + Send + Sync>> =
        vec![Box::new(duckduckgo::Duckduckgo), Box::new(searx::Searx)];

    let mut tasks = Vec::with_capacity(engines.len());

    for engine in engines {
        tasks.push(tokio::spawn(async move {
            engine.results(query, page, &user_agent)
        }))
    }

    let initial: bool = true;
    for task in tasks {
        if initial {
            result_map.extend(task.await?);
            initial = false
        } else {
            task.into_iter().for_each(|(key, value)| {
                result_map
                    .entry(key)
                    .and_modify(|result| {
                        result.add_engines(value.clone().engine());
                    })
                    .or_insert_with(|| -> RawSearchResult {
                        RawSearchResult::new(
                            value.title.clone(),
                            value.visiting_url.clone(),
                            value.description.clone(),
                            value.engine.clone(),
                        )
                    });
            });
        }
    }

    Ok(SearchResults::new(
        result_map
            .into_iter()
            .map(|(key, value)| {
                SearchResult::new(
                    value.title,
                    value.visiting_url,
                    key,
                    value.description,
                    value.engine,
                )
            })
            .collect(),
        query.to_string(),
    ))
}

Do you have any solution to the problem or any alternative implementation @alamin655 @xffxff ?? :slightly_smiling_face:

alamin655 commented 1 year ago

Do you have any solution to the problem or any alternative implementation @alamin655 @xffxff ?? 🙂

How about using rayon or futures?

neon-mmd commented 1 year ago

Do you have any solution to the problem or any alternative implementation @alamin655 @xffxff?? slightly_smiling_face

How about using rayon or futures?

Sorry for being late to reply. The problem with rayon is that it is only multi-threading crate which is also good, but the sync/await works in rust is like it uses time slicing between each async task thus it tends to be a lot faster than simple threading and also tokio does support async multi-threading using spawn function and also in our case since the task we are performing is web based task so we have to wait for the response (in other words awaitable) so I feel it would be more reasonable to use async multi-threading, What do you say??

Also for the futures crate but I couldn't get join_all to work properly. That's why I avoid using it :slightly_smiling_face:.