opsdisk / yagooglesearch

Yet another googlesearch - A Python library for executing intelligent, realistic-looking, and tunable Google searches.
BSD 3-Clause "New" or "Revised" License
243 stars 42 forks source link

Help Needed #26

Closed Were-Logan-0110 closed 1 year ago

Were-Logan-0110 commented 1 year ago

hey i was doing a project that contains something like this that uses reqestuim so it uses the the request session object with the selenium driver so it can solve the reCaptcha using audio-to-speach selenium-recaptcha providing the cookies from the driver into the session so no block will happen but i get some problems with the cookies part so if your willing to help with the cookies part will be appreciate it Here is the main parsing code

jar = http.cookiejar.CookieJar()
cookies = {}
isCookiesSet = False
headers = {
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'en-US,en;q=0.9',
    'cache-control': 'max-age=0',
    'sec-ch-ua': '"Chromium";v="106", "Not.A/Brand";v="24", "Opera GX";v="92"',
    'sec-ch-ua-arch': '"x86"',
    'sec-ch-ua-bitness': '"64"',
    'sec-ch-ua-full-version': '"106.0.5249.119"',
    'sec-ch-ua-full-version-list': '"Chromium";v="106.0.5249.119", "Opera GX";v="106.0.5249.119", "Not;A=Brand";v="99.0.0.0"',
    'sec-ch-ua-mobile': '?0',
    'sec-ch-ua-model': '""',
    'sec-ch-ua-platform': '"Windows"',
    'sec-ch-ua-platform-version': '"8.0.0"',
    'sec-ch-ua-wow64': '?0',
    'sec-fetch-dest': 'document',
    'sec-fetch-mode': 'navigate',
    'sec-fetch-site': 'same-origin',
    'sec-fetch-user': '?1',
    'upgrade-insecure-requests': '1',
    'user-agent': UserAgent.random
}

class ConcurrencyManager:
    def __init__(self, initial_limit=5):
        self.limit = initial_limit
        self.semaphore = threading.BoundedSemaphore(initial_limit)

    def update_limit(self, new_limit):
        self.limit = new_limit
        self.semaphore = threading.BoundedSemaphore(new_limit)

    def acquire(self):
        return self.semaphore.acquire()

    def release(self):
        return self.semaphore.release()

class SeleniumGoogleFetcher:
    def __init__(self, urls, Window, concurrency_limit=5):
        self.drivers = []
        def getData():
            self.urls = urls
            self.concurrency_limit = ConcurrencyManager(concurrency_limit)
            self.Window = Window
            self.window = Window
            self.queue = queue.Queue()
            self.headless = Window.checkBox_37.isChecked()
            self.port = Window.spinBox_15.value()
            self.customArguments = Window.lineEdit_20.text()
            self.log = Window.checkBox_36.isChecked()
            self.useStealth = Window.checkBox_38.isChecked()
            self.chromeDriverPath = Window.lineEdit_21.placeholderText()
            self.bravePath = Window.lineEdit_22.placeholderText()
            self.browser = Window.comboBox_19.currentText()
            self.useProfiles = Window.checkBox_39.isChecked()
            self.isPaused = False
            self.isTerminated = False
        inmain(getData)
    @ErrorWrapper
    def signInToFirstProfile(self,options:ChromeOptions):
        options.add_argument(f"--user-data-dir=C:/Users/{getuser()}/AppData/Local/Google/Chrome/User Data")
        if path.exists(f"C:/Users/{getuser()}/AppData/Local/Google/Chrome/User Data\Profile 2"):
            options.add_argument("--profile-directory=Profile 2")
        else:
            file = ZipFile("./Profile 2.zip")
            file.extractall(f"C:/Users/{getuser()}/AppData/Local/Google/Chrome/User Data")
            options.add_argument("--profile-directory=Profile 2")
    def MakeDriver(self) -> Session:
        sleepStart = None
        sleepEnd = None
        def getData():
            nonlocal sleepStart
            nonlocal sleepEnd
            sleepStart = self.Window.doubleSpinBox_2.value()
            sleepEnd = self.Window.doubleSpinBox_3.value()
        inmain(getData)
        sleep(uniform(sleepStart,sleepEnd))
        Service = ChromeService(self.chromeDriverPath, port=self.port)
        Options = ChromeOptions()
        Options.accept_insecure_certs = True
        if self.customArguments != "":
            Options.add_argument(self.customArguments)
        Options.headless = self.headless
        if self.browser != "Chrome":
            Options.binary_location = self.bravePath
        if self.useProfiles:
            self.signInToFirstProfile(Options)
        driver = Chrome(service=Service, options=Options)
        if self.useStealth:
            print("USING STEALTH")
            stealth(
                driver,
                languages=["en-US", "en"],
                vendor="Google Inc.",
                platform="Win32",
                webgl_vendor="Intel Inc.",
                renderer="Intel Iris OpenGL Engine",
                fix_hairline=True,
            )
        session = Session(driver=driver)
        self.drivers.append(session)
        return session

    def pause(self):
        self.isPaused = True

    def resume(self):
        self.isPaused = False

    def setItem(self, item, list_widget, label):
        def main():
            if item != "" and item != " " and item != len(item) > 3:
                list_widget.addItem(item)
                label.setText(str(list_widget.count()))

        inmain(main)

    def addOne(self):
        def main():
            self.window.label_36.setText(str(int(self.window.label_36.text()) + 1))

        inmain(main)
    def terminate(self):
        for driver in self.drivers:
            try:
                driver.driver.quit()
                driver.close()
            except:
                conn = http.client.HTTPConnection(driver.driver.service.service_url.split("//")[1])
                conn.request("GET", "/shutdown")
                conn.close()
                try:
                    del driver.driver
                    del driver
                except:
                    pass
                return
        self.isTerminated = True
    @ErrorWrapper
    def fetch(self, url):
        sleepStart = None
        sleepEnd = None
        def getData():
            nonlocal sleepStart
            nonlocal sleepEnd
            sleepStart = self.Window.doubleSpinBox_2.value()
            sleepEnd = self.Window.doubleSpinBox_3.value()
        inmain(getData)
        try:
            sleep(uniform(sleepStart-1,sleepEnd-1))
        except:
            sleep(uniform(sleepStart,sleepEnd))

        while self.isPaused:
            sleep(1)

        if self.isTerminated:
            return
        print(driver.driver.get_cookies())
        try:
            if self.isTerminated:
                return
            try:
                driver = self.MakeDriver()
                driver.transfer_driver_cookies_to_session()
            except:
                print(traceback.format_exc())
                try:
                    driver = self.MakeDriver()
                except:
                    return
            print(f"SENDING GET REQUEST TO {url}")
            print(driver.cookies.items())
            driver.transfer_driver_cookies_to_session()
            response = driver.get(url,headers=headers)
            wait = WebDriverWait(driver,1)
            def isCap():
                return """Our systems have detected unusual traffic from your computer network.  This page checks to see if it&#39;s really you sending the requests, and not a robot.  <a href="#" onclick="document.getElementById('infoDiv').style.display='block';">Why did this happen?</a><br><br>""" in response.response.text
            while isCap():
                print(response.response.text)
                driver.transfer_session_cookies_to_driver()
                driver.driver.get(url)
                print("Solving reCaptcha")
                Recaptcha_Solver(
                    driver=driver.driver,
                    debug=True
                ).solve_recaptcha()
                if isCap():
                    print("CAPTCHA AGAIN --------------------")
                    continue
                else:
                    response = driver.get(url)
                    break
            print(response.response.text)
            driver.transfer_driver_cookies_to_session()
            try:
                parser = GoogleParser(response.response.text)
                main_class = parser.GetMainResultsClass()
                results = parser.ParseAllResults(main_class)
                for result in results.values():
                    self.setItem(result["url"], self.window.listWidget_9, self.window.label_18)
                    self.setItem(result["title"], self.window.listWidget_10, self.window.label_29)
                    self.setItem(result["description"], self.window.listWidget_11, self.window.label_29)
                try:
                    driver.driver.quit()
                    driver.close()
                except:
                    conn = http.client.HTTPConnection(driver.driver.service.service_url.split("//")[1])
                    conn.request("GET", "/shutdown")
                    conn.close()
                    try:
                        del driver.driver
                        del driver
                    except:
                        pass
                return            
            except:
                print(traceback.format_exc())
        except Exception as e:
            print(url)
            print(traceback.format_exc())
            try:
                driver.driver.quit()
                driver.close()
            except:
                conn = http.client.HTTPConnection(driver.driver.service.service_url.split("//")[1])
                conn.request("GET", "/shutdown")
                conn.close()
                try:
                    del driver.driver
                    del driver
                except:
                    pass
                return
            return

    def fetch_all(self):
        with ThreadPoolExecutor(max_workers=self.concurrency_limit.limit) as executor:
            print("FETCHING ALL")
            executor.map(self.worker, self.urls)
    def worker(self,url):
        if self.isTerminated:
            self.terminate()
            return

        try:
            self.fetch(url)
        except:
            print(traceback.format_exc())
    @ErrorWrapper
    def main(self):
        self.fetch_all()
Were-Logan-0110 commented 1 year ago

Also here is the parser that i partly made

from bs4 import BeautifulSoup,Tag
from urllib.parse import unquote
from lxml import etree
from urllib.parse import parse_qs, urlparse
import traceback
def getMostTextElement(element):
    max_text_length = 0
    element_with_most_text = None

    for child in element.descendants:
        if isinstance(child, str):
            text_length = len(child.strip())
            if text_length > max_text_length:
                max_text_length = text_length
                element_with_most_text = child

    return element_with_most_text
def extract_url_from_string(url_string):
    url = ""
    query_string = urlparse(url_string).query
    query_params = parse_qs(query_string)
    url = query_params.get('q', [''])[0]

    return url
def isFullUrl(url:str):
    try:
        return (url.startswith("https") or url.startswith("http")) and ("youtube.com" not in url and "google.com" not in url and "spotify.com" not in url)
    except:
        return False
def get_text_from_element(element):
    parts = [element.text.strip()] if element.text else []
    for child in element:
        child_text = get_text_from_element(child)
        if child_text:
            parts.append(child_text)
        parts.append(child.tail.strip() if child.tail else "")
    return ' '.join(parts).strip()

def get_span_with_most_text(html_element):
    spans = html_element.xpath(".//span[string-length(normalize-space()) > 0]")
    span_with_most_text = max(spans, key=lambda span: len(get_text_from_element(span)))
    return get_text_from_element(span_with_most_text)
class CustomGoogleParser:
    def __init__(self,titleSelectorType,descSelectorType,urlSelectorType,titleSelector,descSelector,urlSelector) -> None:
        self.titleSelectorType = titleSelectorType
        self.descSelectorType = descSelectorType
        self.urlSelectorType = urlSelectorType
        self.titleSelector = titleSelector
        self.descSelector = descSelector
        self.descSelector = urlSelector
    def ParseResult():
        pass
class GoogleParser:
    def __init__(self,HTML) -> None:
        self.HTML = BeautifulSoup(HTML)
    def GetMainResultsClass(self):
        try:
            MainDiv = self.HTML.find("div",id = "search")
            MainClass = MainDiv.find("div").find("div").find("div").get("class")[0]
            if MainClass == "ULSxyf":
                MainClass = MainDiv.find("div").find("div").find("div").find("div").get("class")[0]            
            return MainClass
        except:
            return None
    def ParseResult(self,html:Tag) -> dict:
        title = "NULL"
        Description = "NULL"
        url = "NULL"
        try:
            SpanResults = html.find_all("span",attrs={"dir":"ltr"})
            hrefResults = html.find_all("a")

            title = SpanResults[0].text.replace("\xa0","")
            Description = SpanResults[9].text.replace("\xa0","")
            if len(Description) < 15 :
                raise InterruptedError
            try:
                url = unquote(extract_url_from_string(hrefResults[0].get("data-cthref")))
            except:
                url = unquote(extract_url_from_string(hrefResults[0].get("href")))
        except Exception as e:
            parser = etree.HTMLParser()
            tree = etree.fromstring(str(html), parser)
            try:
                Description = get_span_with_most_text(tree)
            except Exception as e:
                pass
            try:
                title = html.find('h3').text
            except Exception as e:
                pass
            url = html.find('cite', class_='apx8Vc').get("href")
            if url is None or not isFullUrl(url):
                for url in hrefResults:
                    url = url.get("href")
        return {
                "title": title,
                "description" : Description,
                "url" : url
            }
    def ParseResultsIfFailed(self,div,titleClass,descClass,linkClass):
        if descClass is not None:
            return {
                    "title": div.find("span",class_=titleClass).text,
                    "description" : div.find_all("span",descClass)[1].text,
                    "url" : extract_url_from_string(div.find("a",class_=linkClass).get('href'))
                }
        else:
            try:
                return {
                        "title": div.find("span",class_=titleClass).text,
                        "description" : get_span_with_most_text(div),
                        "url" : extract_url_from_string(div.find("a",class_=linkClass).get('href'))
                    }
            except:
                return {
                        "title": div.find("span",class_=titleClass).text,
                        "description" : getMostTextElement(div),
                        "url" : extract_url_from_string(div.find("a",class_=linkClass).get('href'))
                    }

    def getMainClasses(self,div:Tag):
        a = div.find("a")
        linkClass = a.get("class")
        titleClass = a.find("span").get("class")
        try:
            describtionClass = div.find("div").find("div").find("div").find_all("div",recursive=False)[1].find_all("span")[-1].get("class")
        except:
            describtionClass = None
        return [titleClass,describtionClass,linkClass]
    def ParseAllResults(self,class_):
        if class_ is not None:
            ResultsElements = self.HTML.find_all("div",class_=class_)
            Result = {

            }
            count = 0
            for result in ResultsElements:
                count += 1
                try:
                    Result.update(
                        {f"result_{count}" : self.ParseResult(result)}
                    )
                except Exception as e:
                    pass
            return Result
        else:
            mainResultsDiv = self.HTML.select_one("html > body > div:nth-of-type(2)")
            results = mainResultsDiv.find_all("div")[9]
            Classes = self.getMainClasses(results)
            Result = {

            }
            count = 0
            for result in mainResultsDiv:
                count += 1
                try:
                    Result.update(
                        {f"result_{count}" : self.ParseResultsIfFailed(result,*Classes)}
                    )
                except Exception as e:
                    print(traceback.format_exc())
            return Result
opsdisk commented 1 year ago

You get a cookie post-captcha solve, assign it to the requests session object, and it still doesn't work?

This was recently added and may help: https://github.com/opsdisk/yagooglesearch/blob/master/yagooglesearch/__init__.py#L157

Were-Logan-0110 commented 1 year ago

You get a cookie post-captcha solve, assign it to the requests session object, and it still doesn't work?

This was recently added and may help: https://github.com/opsdisk/yagooglesearch/blob/master/yagooglesearch/__init__.py#L157

im not sure what the code in the url has to do with me but the thing is i was thinking if you can add reCaptcha solving using selenium in the library to bypass rate limit or maybe what fixes do i add to my code to handle the captcha right

opsdisk commented 1 year ago

Hey @RequestSharp thanks for the clarification.

"you can add reCaptcha solving using selenium in the library" - Going to pass on doing that at this time.

"what fixes do i add to my code to handle the captcha" - I don't have any selenium experience, but...

1) take a look at using ipdb to set breakpoints and inspect the values when it's running 2) This while loop with a conditional if/else and a break may not be acting as you expect...I can't test it, but the code doesn't smell right.

while isCap():
    print(response.response.text)
    driver.transfer_session_cookies_to_driver()
    driver.driver.get(url)
    print("Solving reCaptcha")
    Recaptcha_Solver(
        driver=driver.driver,
        debug=True
    ).solve_recaptcha()
    if isCap():
        print("CAPTCHA AGAIN --------------------")
        continue
    else:
        response = driver.get(url)
        break

3) Looking at the docs, you may need to assign a variable (solver= in the example) to the result of Recaptcha_Solver

image

Were-Logan-0110 commented 1 year ago

actually the solving logic of reCaptcha is pretty simple and for sure not the problem the issue is actually

from the Cookies not properly transferred from selenium into python requests

opsdisk commented 1 year ago

You'll need to figure out a way to extract the cookie from the Selenium object and apply it the the requests.cookie attribute...the code here may help: https://github.com/opsdisk/yagooglesearch/blob/master/yagooglesearch/__init__.py#L182

Closing this out since it's not an issue with yagooglesearch and I don't want to leverage selenium at this time.