unclecode / crawl4ai

🔥🕷️ Crawl4AI: Crawl Smarter, Faster, Freely. For AI.
https://crawl4ai.com
Apache License 2.0
17.01k stars 1.26k forks source link

Hello, can we introduce Playwright as a dynamic crawler retriever to solve the problem of crawling API interface data #89

Closed adminChina closed 2 months ago

adminChina commented 2 months ago

This is my idea because I want to load PDFs, but it's not easy to intercept them through Selenium

class LocalPlaywrightCrawlerStrategy(CrawlerStrategy):
    def __init__(self, use_cached_html=False, js_code=None, **kwargs):
        super().__init__()
        print("[LOG] 🚀 Initializing LocalPlaywrightCrawlerStrategy")

        self.playwright = sync_playwright().start()
        browser_type = self.playwright.chromium  # or .firefox or .webkit
        self.headless = kwargs.get("headless", True)
        self.user_agent = kwargs.get("user_agent",
                                     "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
        self.cookies = kwargs.get("cookies", [])
        self.verbose = kwargs.get("verbose", False)
        self.use_cached_html = use_cached_html
        self.js_code = js_code

        self.extra_http_headers = kwargs.get("headers", {})

        # Hooks
        self.hooks = {
            'on_page_created': None,
            'on_user_agent_updated': None,
            'before_get_url': None,
            'after_get_url': None,
            'before_return_html': None
        }

        self.args = ['--disable-gpu',
                     '--no-sandbox',
                     '--disable-dev-shm-usage',
                     '--disable-blink-features',
                     '--disable-blink-features=AutomationControlled',
                     '--disable-web-security',
                     '--ignore-certificate-errors']

        self.browser = browser_type.launch(headless=self.headless, args=self.args)
        self.context = self.browser.new_context(
            user_agent=self.user_agent,
            viewport={'width': 1920, 'height': 1080},
            locale="zh-CN",
            bypass_csp=True,
            ignore_https_errors=True,
            accept_downloads=True,
            extra_http_headers=self.extra_http_headers)
        # 监听路由信息
        self.context.route('**/*', self.route_handler)

        # Apply cookies if provided
        if self.cookies:
            self.context.add_cookies(self.cookies)

        # 创建页面
        self.page = self.context.new_page()

        # 页面创建钩子函数
        self.page = self.execute_hook('on_page_created', self.page)

    def set_hook(self, hook_type: str, hook: Callable):
        if hook_type in self.hooks:
            self.hooks[hook_type] = hook
        else:
            raise ValueError(f"Invalid hook type: {hook_type}")

    def execute_hook(self, hook_type: str, *args, **kwargs) -> 'Page':
        hook = self.hooks.get(hook_type)
        if hook:
            result = hook(*args, **kwargs)
            if result is not None:
                if isinstance(result, Page):
                    return result
                else:
                    raise TypeError(f"Hook {hook_type} must return an instance of Page or None.")
        return self.page

    def update_user_agent(self, user_agent: str):
        self.page.close()
        self.context.close()

        self.context = self.browser.new_context(
            user_agent=self.user_agent,
            viewport={'width': 1920, 'height': 1080},
            locale="zh-CN",
            bypass_csp=True,
            ignore_https_errors=True,
            accept_downloads=True,
            extra_http_headers=self.extra_http_headers)
        self.context.route('**/*', self.route_handler)

        self.page = self.context.new_page()
        self.page = self.execute_hook('on_user_agent_updated', self.page)

    def set_custom_headers(self, headers: dict):
        self.page.set_extra_http_headers(headers)

    def _ensure_page_load(self, max_checks=6, check_interval=0.01):
        initial_length = len(self.page.content())

        for ix in range(max_checks):
            time.sleep(check_interval)
            current_length = len(self.page.content())
            if current_length != initial_length:
                break

        return self.page.content()

    def crawl(self, url: str, **kwargs) -> str:
        import hashlib
        url_hash = hashlib.md5(url.encode()).hexdigest()
        cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", url_hash)

        if self.use_cached_html and os.path.exists(cache_file_path):
            with open(cache_file_path, "r") as f:
                return sanitize_input_encode(f.read())

        try:
            self.page = self.execute_hook('before_get_url', self.page)
            if self.verbose:
                print(f"[LOG] 🕸️ Crawling {url} using LocalPlaywrightCrawlerStrategy...")

            self.page.goto(url)

            self.page.wait_for_load_state("load")
            self.page.wait_for_selector("body", state="attached")

            self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
            self.page = self.execute_hook('after_get_url', self.page)

            html = sanitize_input_encode(self._ensure_page_load())
            can_not_be_done_headless = False  # Look at my creativity for naming variables

            if kwargs.get('bypass_headless', False) or html == "<html><head></head><body></body></html>":
                print("[LOG] 🙌 Page could not be loaded in headless mode. Trying non-headless mode...")
                if url.endswith('pdf'):
                    can_not_be_done_headless = True
                    pdf_content = self.crawl_pdf(url)
                    if pdf_content:
                        html = f'''<html><head></head><body>{sanitize_input_encode(pdf_content)}</body></html>'''
                else:
                    can_not_be_done_headless = True
                    browser = self.playwright.chromium.launch(headless=False)
                    context = browser.new_context(user_agent=self.user_agent)
                    page = context.new_page()
                    page.goto(url)
                    page.wait_for_load_state("load")
                    html = sanitize_input_encode(page.content())

            if self.js_code:
                if isinstance(self.js_code, str):
                    self.page.evaluate(self.js_code)
                elif isinstance(self.js_code, list):
                    for js in self.js_code:
                        self.page.evaluate(js)

            # 是否重新获取页面数据
            if not can_not_be_done_headless:
                html = sanitize_input_encode(self.page.content())

            self.page = self.execute_hook('before_return_html', self.page, html)

            with open(cache_file_path, "w", encoding="utf-8") as f:
                f.write(html)

            if self.verbose:
                print(f"[LOG] ✅ Crawled {url} successfully!")

            return html
        except Exception as e:
            raise Exception(f"Failed to crawl {url}: {str(e)}")

    def route_handler(self, route: Route, request: Request):
        if request.resource_type == 'document':
            response: APIResponse
            try:
                response = route.fetch()
            except Exception as e:
                route.continue_()
                return
            contentType = response.headers.get('Content-Type', response.headers.get('content-type', 'text/html'))
            if contentType == 'application/pdf':
                self.crawl_pdf(route, request, response)
            else:
                route.fulfill(response=response)
        else:
            route.continue_()

    def crawl_pdf(self, route: Route, request: Request, response: APIResponse):
        import hashlib
        url_hash = f'{hashlib.md5(unquote(request.url).encode()).hexdigest()}.pdf'
        cache_dir_path = os.path.join(Path.home(), ".crawl4ai", "cache", "pdf")
        cache_file_path = os.path.join(cache_dir_path, url_hash)

        if not os.path.exists(cache_dir_path):
            os.makedirs(cache_dir_path)

        download_complete = True

        try:
            with open(cache_file_path, 'wb') as f:
                f.write(response.body())
        except Exception as e:
            download_complete = False

        if download_complete:
            headers = response.headers
            if 'Content-Type' in headers.keys():
                del headers['Content-Type']
            if 'content-type' in headers.keys():
                del headers['content-type']
            headers['content-type'] = 'text/html'
            route.fulfill(
                status=response.status,
                headers=headers,
                body=pares_pdf(cache_file_path)
            )
        else:
            route.fulfill(response=response)

    def take_screenshot(self) -> str:
        try:
            self.page.set_viewport_size({"width": 1920, "height": 1080})
            screenshot = self.page.screenshot()

            image = Image.open(BytesIO(screenshot))
            rgb_image = image.convert('RGB')

            buffered = BytesIO()
            rgb_image.save(buffered, format="JPEG", quality=85)
            img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')

            if self.verbose:
                print(f"[LOG] 📸 Screenshot taken and converted to base64")

            return img_base64
        except Exception as e:
            error_message = f"Failed to take screenshot: {str(e)}"
            print(error_message)

            img = Image.new('RGB', (800, 600), color='black')
            draw = ImageDraw.Draw(img)

            try:
                font = ImageFont.truetype("arial.ttf", 40)
            except IOError:
                font = ImageFont.load_default()

            text_color = (255, 255, 255)
            max_width = 780
            wrapped_text = wrap_text(draw, error_message, font, max_width)

            text_position = (10, 10)
            draw.text(text_position, wrapped_text, fill=text_color, font=font)

            buffered = BytesIO()
            img.save(buffered, format="JPEG")
            img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')

            return img_base64

    def quit(self):
        self.browser.close()
        self.playwright.stop()

pares_pdf


def pares_pdf(file_path: str):
    def rotate_img(img, angle):
        """
        img   --image
        angle --rotation angle
        return--rotated img
        """

        h, w = img.shape[:2]
        rotate_center = (w / 2, h / 2)
        # 获取旋转矩阵
        # 参数1为旋转中心点;
        # 参数2为旋转角度,正值-逆时针旋转;负值-顺时针旋转
        # 参数3为各向同性的比例因子,1.0原图,2.0变成原来的2倍,0.5变成原来的0.5倍
        M = cv2.getRotationMatrix2D(rotate_center, angle, 1.0)
        # 计算图像新边界
        new_w = int(h * np.abs(M[0, 1]) + w * np.abs(M[0, 0]))
        new_h = int(h * np.abs(M[0, 0]) + w * np.abs(M[0, 1]))
        # 调整旋转矩阵以考虑平移
        M[0, 2] += (new_w - w) / 2
        M[1, 2] += (new_h - h) / 2

        rotated_img = cv2.warpAffine(img, M, (new_w, new_h))
        return rotated_img

    def pdf2text(filepath):
        import fitz  # pyMuPDF里面的fitz包,不要与pip install fitz混淆
        import numpy as np

        ocr = get_ocr()
        doc = fitz.open(filepath)
        resp = ""

        b_unit = tqdm.tqdm(
            total=doc.page_count, desc="RapidOCRPDFLoader context page index: 0"
        )
        for i, page in enumerate(doc):
            b_unit.set_description(
                "RapidOCRPDFLoader context page index: {}".format(i)
            )
            b_unit.refresh()
            text = page.get_text("html")
            resp += text + "\n"

            img_list = page.get_image_info(xrefs=True)
            for img in img_list:
                if xref := img.get("xref"):
                    bbox = img["bbox"]
                    # 检查图片尺寸是否超过设定的阈值
                    if (bbox[2] - bbox[0]) / (page.rect.width) < 0.6 or (bbox[3] - bbox[1]) / (
                            page.rect.height
                    ) < 0.6:
                        continue
                    pix = fitz.Pixmap(doc, xref)
                    samples = pix.samples
                    if int(page.rotation) != 0:  # 如果Page有旋转角度,则旋转图片
                        img_array = np.frombuffer(
                            pix.samples, dtype=np.uint8
                        ).reshape(pix.height, pix.width, -1)
                        tmp_img = Image.fromarray(img_array)
                        ori_img = cv2.cvtColor(np.array(tmp_img), cv2.COLOR_RGB2BGR)
                        rot_img = rotate_img(img=ori_img, angle=360 - page.rotation)
                        img_array = cv2.cvtColor(rot_img, cv2.COLOR_RGB2BGR)
                    else:
                        img_array = np.frombuffer(
                            pix.samples, dtype=np.uint8
                        ).reshape(pix.height, pix.width, -1)

                    result, _ = ocr(img_array)
                    if result:
                        ocr_result = [line[1] for line in result]
                        resp += "\n".join(ocr_result)

            # 更新进度
            b_unit.update(1)
        return resp

    text = pdf2text(file_path)
    return text
unclecode commented 2 months ago

@adminChina Thanks for the work! We're rolling out the new version with 100% async operation and Playwright, hopefully by the end of this week. Currently testing it, it's significantly lighter and faster.

mysticaltech commented 2 months ago

@unclecode Playwright support is a fantastic news! I'm excited to try running it on Kubernetes when it comes out, using the docker image.

Here’s how I plan to set it up:

One key feature request:

unclecode commented 2 months ago

@mysticaltech amazing! Love your ambition haha and what if I tell you we are working in the scraper engine? 😎 it going to be really optimized, faster and efficient. Hopefully in a few weeks.

mysticaltech commented 2 months ago

@unclecode Wonderful to hear 🚀

adminChina commented 2 months ago

@adminChina Thanks for the work! We're rolling out the new version with 100% async operation and Playwright, hopefully by the end of this week. Currently testing it, it's significantly lighter and faster.

good

nyck33 commented 2 months ago

Hi there, can you share the progress?

Krato commented 1 month ago

Any update? Thank you!