GeneralMills / pytrends

Pseudo API for Google Trends
Other
3.12k stars 798 forks source link

Update to Selenium script #610

Open robrady opened 5 months ago

robrady commented 5 months ago

Thanks for sharing your selenium code.

In case anyone is interested, I've adapted it further and made some changes to the interface.

import selenium_gtrends_api as sg

keywords = ["test keyword A", "test keyword B"]
download_folder = 'c:/users/<<username>>/Downloads/'

api = sg.SeleniumGtrendsAPI(download_folder, geo = 'IE')

keyword = "test keyword A"
related = api.get_related(keyword, "2023-01-01 2023-12-31")

multi = api.get_multi_timeline(keywords, "2023-01-01 2023-12-31")
api.quit()

Adapted library is:

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import http.cookies
import pandas as pd
import urllib.parse
import os

import time
import glob
from curl_cffi import requests as cffi_requests

MAX_RETRIES = 5

class _ScrapeAssistant():

    def __init__(self):
        pass

    def click_and_download_file(self):
        pass

    def get_df_from_file(self):
        pass

    def filename(self):
        pass

class _MultitimelineScrapeAssistant(_ScrapeAssistant):
    def __init__(self):
        return

    def click_and_download_file(self, api):
        excel_button_xpath = "//div[1]/trends-widget/ng-include/widget/div/div/div/widget-actions/div/button/i"

        WebDriverWait(api.driver, 10).until(EC.visibility_of_element_located((By.XPATH, excel_button_xpath)))
        api.driver.find_element(By.XPATH, excel_button_xpath).click()
        time.sleep(1)  
        pass

    def get_df_from_file(self, api):
        interest_df = pd.read_csv(api.download_folder + 'multiTimeline.csv', skiprows=2)

        old_columns = list(interest_df.columns)
        geo_label = api.geolabel
        new_columns = [column.replace(f": ({geo_label})", "") for column in old_columns]
        new_columns = [column.replace("Week", "date") for column in new_columns ]
        rename_dict = dict(zip(old_columns, new_columns))

        interest_df = interest_df.rename(columns = rename_dict)

        # Define the date column name
        date_column_name = 'date'

        # List of columns to be converted (excluding the date column)
        columns_to_convert = [col for col in interest_df.columns if col != date_column_name]

        # Replace "<1" with 0 in specified columns
        interest_df[columns_to_convert] = interest_df[columns_to_convert].replace('<1', 0)

        # Convert specified columns to int
        interest_df[columns_to_convert] = interest_df[columns_to_convert].astype(int)

        os.remove(api.download_folder + 'multiTimeline.csv')

        return interest_df

    def filename(self):
        return "multiTimeline.csv"

class _RelatedQueryScrapeAssistant(_ScrapeAssistant):
    def __init__(self):
        return

    def click_and_download_file(self, api):

        api.driver.set_window_size(1400, 1000)

        time.sleep(1)  

        api.driver.execute_script("window.scrollTo(0,1600)")

        #time.sleep(5) 

        WebDriverWait(api.driver, 2).until(EC.visibility_of_element_located((By.XPATH, "//*[ @class='_md-select-value' and ./span/div/text()[contains(., 'Rising')]]")))

        excel_button_xpath = "//div[4]/trends-widget/ng-include/widget/div/div/div/widget-actions/div/button/i"

        WebDriverWait(api.driver, 10).until(EC.visibility_of_element_located((By.XPATH, excel_button_xpath)))
        api.driver.find_element(By.XPATH, excel_button_xpath).click()
        time.sleep(1)  

    def get_df_from_file(self, api):

        related_file_path = api.download_folder + 'relatedQueries.csv'
        related_file = open(related_file_path)

        all_lines = related_file.readlines()
        related_file.close()

        file_content = ''.join(all_lines)

        start_marker = "TOP"
        #end_marker = ""

        lines = file_content.split('\n')

        top_df = pd.DataFrame(columns=['query', 'value'])
        is_inside_block = False
        for line in lines:
            if line.strip() == start_marker:
                is_inside_block = True
            elif is_inside_block and not line.strip():
                is_inside_block = False
                break
            elif is_inside_block:
                line_str = line.strip()
                line_content = line_str.split(",")
                keyword = line_content[0]
                value = line_content[1]
                value = "0" if value == "<1" else value
                value = int(value)

                new_row = {'query': keyword,
                           'value': value}
                # Convert the new row to a DataFrame
                new_row_df = pd.DataFrame([new_row])

                top_df = pd.concat([top_df, new_row_df], ignore_index=True)        

        os.remove(api.download_folder + 'relatedQueries.csv')

        return top_df

    def filename(self):
        return 'relatedQueries.csv'

class SeleniumGtrendsAPI:
    def __init__(self, download_folder, geo = 'IE'):
        self.download_folder = download_folder
        self.geo = geo
        self.geolabel = "Ireland"
        # first check that the download folder is clear of all files

        existing_related_files = glob.glob(download_folder + 'relatedQueries*.csv') 
        if len(existing_related_files) > 0:
            existing_files_str = "|".join(existing_related_files)
            raise Exception(f"Existing related files in download folder {download_folder}: {existing_files_str}")

        existing_multi_timeline_files = glob.glob(download_folder + 'multiTimeline*.csv')
        if len(existing_multi_timeline_files) > 0:
            existing_multi_timeline_str = "|".join(existing_multi_timeline_files)
            raise Exception(f"Existing related files in download folder {download_folder}: {existing_multi_timeline_str}")            

        self.browser_versions = ["chrome99", "chrome100", "chrome101", "chrome104", "chrome107", "chrome110"]

        chrome_options = Options()
        #chrome_options.add_argument("--headless")

        chrome_options.add_argument("start-maximized")#; // open Browser in maximized mode
        chrome_options.add_argument("disable-infobars")#; // disabling infobars
        #chrome_options.add_argument(r"--user-data-dir=C:\users\ronan.brady\AppData\Local\Google\Chrome\User Data")  #e.g. C:\Users\You\AppData\Local\Google\Chrome\User Data
        #chrome_options.add_argument(r'--profile-directory=YourProfileDir') #e.g. Profile 3 
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument("--disable-extensions") # disabling extensions
        chrome_options.add_argument("--disable-gpu") # applicable to windows os only
        chrome_options.add_argument("--disable-dev-shm-usage") # overcome limited resource problems

        self.driver = webdriver.Chrome(options=chrome_options)       

    def __get_helper(self, keywords : list, date_str, scrape_assist : _ScrapeAssistant):

        if isinstance(keywords, list):
            keywords = ",".join(keywords)

        encoded_keywords = urllib.parse.quote_plus(keywords)
        date_str = urllib.parse.quote_plus(date_str)
        retries = 0
        file_downloaded = False
        while retries < MAX_RETRIES and not file_downloaded:
            response = cffi_requests.get("https://www.google.com", impersonate=self.browser_versions[retries % len(self.browser_versions)])
            cookies = response.cookies
            for cookie in cookies:
                cookie_str = str(cookie)
                cookie_dict = http.cookies.SimpleCookie(cookie_str)
                for key, morsel in cookie_dict.items():
                    selenium_cookie = {
                        'name': key,
                        'value': morsel.value,
                        'domain': cookie.domain
                    }
                    self.driver.add_cookie(selenium_cookie)

            trends_url = f'https://trends.google.com/trends/explore?date={date_str}&geo=IE&q={encoded_keywords}'

            self.driver.get(trends_url)

            try:

                scrape_assist.click_and_download_file(self)

                if os.path.exists(self.download_folder + scrape_assist.filename()):
                    file_downloaded = True
                else:
                    print(f"File not downloaded. Attempt {retries + 1} of {MAX_RETRIES}...")
                    retries += 1
                    time.sleep(retries)  
                    self.driver.refresh()

            except Exception as e:
                print(f"Error during download attempt: {str(e)}")
                retries += 1
                time.sleep(retries) 

        if file_downloaded:
            try:

                df = scrape_assist.get_df_from_file(self)

                return df

            except Exception as e:
                raise Exception(f"Error in reading or deleting the file 'multiTimeline.csv': {str(e)}")
        else:
            raise Exception("File not downloaded after the maximum number of attempts.")        

    def get_multi_timeline(self, keywords : list, date_str : str):
        if not isinstance(keywords, list):
            raise Exception("Expected keywords to be a list")
        return self.__get_helper(keywords, date_str, _MultitimelineScrapeAssistant())

    def get_related(self, keyword : str, date_str : str):
        return self.__get_helper(keyword, date_str, _RelatedQueryScrapeAssistant())

    def quit(self):

        self.driver.quit()
          > I repeat, I am not a professional developer, but I developed this code which directly takes the csv from the trends and then prints times and values. The problem is that it is a very heavy code to run (on pythonanywhere it takes a lot of CPU). Help me develop it to make it more usable.
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import http.cookies
import pandas as pd
import urllib.parse
import os
import json
import time
from curl_cffi import requests as cffi_requests

MAX_RETRIES = 5

def trend_selenium(keywords):
    browser_versions = ["chrome99", "chrome100", "chrome101", "chrome104", "chrome107", "chrome110"]

    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--window-size=1920,1080")
    chrome_options.add_argument("--user-data-dir=./user_data")

    driver = webdriver.Chrome(options=chrome_options)

    encoded_keywords = urllib.parse.quote_plus(keywords)

    retries = 0
    file_downloaded = False
    while retries < MAX_RETRIES and not file_downloaded:
        response = cffi_requests.get("https://www.google.com", impersonate=browser_versions[retries % len(browser_versions)])
        cookies = response.cookies
        for cookie in cookies:
            cookie_str = str(cookie)
            cookie_dict = http.cookies.SimpleCookie(cookie_str)
            for key, morsel in cookie_dict.items():
                selenium_cookie = {
                    'name': key,
                    'value': morsel.value,
                    'domain': cookie.domain
                }
                driver.add_cookie(selenium_cookie)

        trends_url = f'https://trends.google.com/trends/explore?date=now%207-d&geo=US&q={encoded_keywords}'
        print(trends_url)
        driver.get(trends_url)

        excel_button_selector = "body > div.trends-wrapper > div:nth-child(2) > div > md-content > div > div > div:nth-child(1) > trends-widget > ng-include > widget > div > div > div > widget-actions > div > button.widget-actions-item.export > i"

        try:
            WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.CSS_SELECTOR, excel_button_selector)))
            driver.find_element(By.CSS_SELECTOR, excel_button_selector).click()
            time.sleep(5)  # Aggiungi una pausa per attendere il download

            if os.path.exists('multiTimeline.csv'):
                file_downloaded = True
            else:
                print(f"File not downloaded. Attempt {retries + 1} of {MAX_RETRIES}...")
                retries += 1
                time.sleep(retries)  # Implementa un ritardo esponenziale
                driver.refresh()

        except Exception as e:
            print(f"Error during download attempt: {str(e)}")
            retries += 1
            time.sleep(retries)  # Implementa un ritardo esponenziale

    trend_data = {}
    if file_downloaded:
        try:
            trend_df = pd.read_csv('multiTimeline.csv', skiprows=2)
            trend_df['Time'] = pd.to_datetime(trend_df['Time']).dt.strftime('%Y-%m-%d %H:%M:%S')
            data_column = [col for col in trend_df.columns if col not in ['Time']][0]
            trend_data = dict(zip(trend_df['Time'], trend_df[data_column]))
            os.remove('multiTimeline.csv')
            trends_str = json.dumps(trend_data)
        except Exception as e:
            print(f"Error in reading or deleting the file 'multiTimeline.csv': {str(e)}")
    else:
        print("File not downloaded after the maximum number of attempts.")

    driver.quit()
    return trends_str

keywords = "test"
trends_str = trend_selenium(keywords)
print(trends_str)

thanks its working with some modifications use cases can be satisified !! upvoted

Originally posted by @dhruv-1010 in https://github.com/GeneralMills/pytrends/issues/602#issuecomment-1834101412

ehulland commented 5 months ago

I made my own selenium scraper using very similar methods to the above, but I am recently getting a lot of "Oops something went wrong, please try again in a bit" errors with my scraper on terms that should be working (as per a manual GST search). Is anyone facing a similar road block? Any workarounds? I am trying to scrape a number of terms for varying locations and time scales and do end up getting the 200 rate-limit error on a daily basis when I hit some query threshold, but this seems to be separate.

image
CyberTamilan commented 5 months ago

I made my own selenium scraper using very similar methods to the above, but I am recently getting a lot of "Oops something went wrong, please try again in a bit" errors with my scraper on terms that should be working (as per a manual GST search). Is anyone facing a similar road block? Any workarounds? I am trying to scrape a number of terms for varying locations and time scales and do end up getting the 200 rate-limit error on a daily basis when I hit some query threshold, but this seems to be separate. image

Use tailscale and connect your mobile as exit node. If you hit 200 turn on and off Aeroplane mode so ip will be changed that will reduce the 200 error and you will get new ip. But it will consume lot of load on mobile data. If you have wifi network then turn on and off router you will get new ip since Residential ip's are dynamic in nature. You will get the good results

kaselrap commented 5 months ago

When use --headless option, Google still detects userType as USER_TYPE_SCRAPPER and didn't get any results. Just a message: Oops! Something went wrong. Also all requests responded with 429 status code, ex multiline and etc. But without --headless when browser are opening it works completly, and file are downloading properly. Do you know any options, to solve this moment?

williamegomezo commented 5 months ago

These settings work perfectly for me headless:

from fake_useragent import UserAgent

chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-size=640,400") ua = UserAgent() user_agent = ua.random chrome_options.add_argument(f'--user-agent={user_agent}')

And

driver.implicitly_wait(5)

The magic is done by: UserAgent