CuddleBear92 / Hydrus-Presets-and-Scripts

collection of presets and scripts for Hydrus
266 stars 53 forks source link

List boorus sorted by the percentage of images containing a given tag #150

Closed 8ullyMaguire closed 1 year ago

8ullyMaguire commented 1 year ago

I would like to request a feature that allows for the listing of boorus sorted by the percentage of images containing a given tag in the Hydrus-Presets-and-Scripts repository. To achieve this, it would be helpful to have a script that saves the boorus data into a database and another script that queries that database and generates a list of boorus sorted by the percentage of images containing the specified tag. While attempting to implement this functionality, I encountered a cloudflare error on the first booru I was testing. I am unsure how Hydrus manages to download data even with cloudflare, but I believe someone interested in this feature could potentially overcome this issue and contribute it to the repository.

"""
This script is used to scrape data from booru websites and store it in a SQLite database. It utilizes the BeautifulSoup library to parse HTML and extract information from webpages. The script performs the following tasks:

1. Imports necessary libraries and modules.
2. Defines global variables for the database file, maximum backoff time, and logging format.
3. Defines functions for executing SQL queries, initializing the database, inserting boorus and tags into the database, and checking if boorus and tags exist in the database.
4. Defines a function for fetching webpages using the cloudscraper library and handling request exceptions with exponential backoff.
5. Defines functions for extracting boorus and tags from webpages and inserting them into the database.
6. Defines the main function that initializes the database, extracts boorus and tags, and inserts them into the database.
7. Executes the main function if the script is run directly.

The script uses docstrings for every function to provide detailed explanations of their purpose and usage. It also uses logging to record debug information and errors. The database is created if it doesn't exist and the necessary tables are created. Boorus and tags are only inserted into the database if they don't already exist. The script utilizes generators to efficiently process large amounts of data.

Giving error because of failed Cloudflare challenge
"""
import re
import backoff
import cloudscraper
import logging
import sqlite3
import time
import requests

from bs4 import BeautifulSoup
from typing import Any, Generator, List, Dict

DB_FILE = "boorus.db"
MAX_BACKOFF_TIME = 300

FORMAT = "[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s"
logging.basicConfig(
    level=logging.DEBUG,
    format=FORMAT,
    handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()],
)

def execute_query(sql_query: str) -> None:
    """Execute a SQL query on the database.

    Args:
        sql_query (str): The SQL query to execute.

    Returns:
        None
    """
    with sqlite3.connect(DB_FILE) as conn:
        cursor = conn.cursor()
        cursor.execute(sql_query)
        conn.commit()

def initialize_database() -> None:
    """Initialize the database by creating the necessary tables if they don't exist.

    Args:
        None

    Returns:
        None
    """
    sql_query = '''
    CREATE TABLE IF NOT EXISTS boorus (
        nsfw TEXT,
        name TEXT,
        title TEXT,
        url TEXT PRIMARY KEY,
        icon TEXT,
        images INTEGER,
        users INTEGER,
        owner TEXT
    )'''
    execute_query(sql_query)

    sql_query = '''
    CREATE TABLE IF NOT EXISTS booru_tags (
        tag TEXT,
        booru_url TEXT,
        posts INTEGER,
        type TEXT,
        PRIMARY KEY (tag, booru_url),
        FOREIGN KEY (booru_url) REFERENCES boorus(url)
    )'''
    execute_query(sql_query)

def insert_booru(booru: Dict[str, str]) -> None:
    """Insert a booru into the database.

    Args:
        booru (Dict[str, str]): A dictionary containing the booru data.

    Returns:
        None
    """
    with sqlite3.connect(DB_FILE) as conn:
        cursor = conn.cursor()
        sql_query = '''INSERT INTO boorus (nsfw, name, title, url, icon, images, users, owner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)'''
        cursor.execute(sql_query, (booru["nsfw"], booru["name"], booru["title"], booru["url"], booru["icon"], booru["images"], booru["users"], booru["owner"]))
        conn.commit()

def check_booru_exists(url: str) -> bool:
    """Check if a booru exists in the database.

    Args:
        url (str): The URL of the booru to check.

    Returns:
        bool: True if the booru exists in the database, False otherwise.
    """
    with sqlite3.connect(DB_FILE) as conn:
        cursor = conn.cursor()
        sql_query = '''SELECT EXISTS(SELECT 1 FROM boorus WHERE url = ?)'''
        cursor.execute(sql_query, (url,))
        exists: bool = cursor.fetchone()[0]
    return exists

def insert_tag(tag: Dict[str, Any]) -> None:
    """Insert a tag into the database.

    Args:
        tag (Dict[str, Any]): A dictionary containing the tag data.

    Returns:
        None
    """
    with sqlite3.connect(DB_FILE) as conn:
        cursor = conn.cursor()
        sql_query = '''INSERT INTO booru_tags (tag, booru_url, posts, type) VALUES (?, ?, ?, ?)'''
        cursor.execute(sql_query, (tag['tag'], tag['booru_url'], tag['posts'], tag['type']))
        conn.commit()

def check_tag_exists(tag: Dict[str, Any]) -> bool:
    """Check if a tag exists in the database.

    Args:
        tag (Dict[str, Any]): A dictionary containing the tag data.

    Returns:
        bool: True if the tag exists in the database, False otherwise.
    """
    with sqlite3.connect(DB_FILE) as conn:
        cursor = conn.cursor()
        sql_query = '''SELECT EXISTS(SELECT 1 FROM booru_tags WHERE tag = ? AND booru_url = ?)'''
        cursor.execute(sql_query, (tag['tag'], tag['booru_url']))
        exists: bool = cursor.fetchone()[0]
    return exists

@backoff.on_exception(
    backoff.expo,
    requests.exceptions.RequestException,
    max_time=MAX_BACKOFF_TIME,
)
def fetch_webpage(url: str) -> BeautifulSoup:
    """Fetch a webpage and return its BeautifulSoup object.

    Args:
        url (str): The URL of the webpage to fetch.

    Returns:
        BeautifulSoup: The BeautifulSoup object of the fetched webpage.
    """
    scraper = cloudscraper.create_scraper()
    response = scraper.get(url)
    #response = requests.get(url)
    time.sleep(0.2)
    return BeautifulSoup(response.text, "html.parser")

def extract_boorus(top_boorus: str) -> Generator[Dict[str, Any], None, None]:
    """Extract boorus from the top boorus page.

    Args:
        top_boorus (str): The URL of the top boorus page.

    Yields:
        Dict[str, Any]: A dictionary containing the booru data.
    """
    page: int = 0
    while True:
        soup: BeautifulSoup = fetch_webpage(f"{top_boorus}?top_boorus[page]={page}")
        boorus: List[Dict[str, Any]] = extract_boorus_from_html(soup)
        if not boorus:
            break
        for booru in boorus:
            yield booru
        page += 1

def extract_boorus_from_html(soup: BeautifulSoup) -> List[Dict[str, str]]:
    """Extract boorus from the HTML of the top boorus page.

    Args:
        soup (BeautifulSoup): The BeautifulSoup object of the top boorus page.

    Returns:
        List[Dict[str, str]]: A list of dictionaries containing the booru data.
    """
    table = soup.find('table', {'class': 'top-boorus'})
    rows = table.find_all('tr')[4:]
    boorus = []
    for row in rows:
        cells = row.find_all('td')
        if len(cells) < 7:
            continue
        booru_url = cells[3].find('a')['href']
        if not check_booru_exists(booru_url):
            booru_data = extract_booru_data(cells)
            boorus.append(booru_data)
            insert_booru(booru_data)
    return boorus

def extract_booru_data(cells: List[BeautifulSoup]) -> Dict[str, Any]:
    """Extract booru data from the cells of a row in the top boorus table.

    Args:
        cells (List[BeautifulSoup]): A list of BeautifulSoup objects representing the cells of a row in the top boorus table.

    Returns:
        Dict[str, Any]: A dictionary containing the booru data.
    """
    return {
        "nsfw": "NSFW" if cells[1].text.strip() == "NSFW" else "SFW",
        "name": cells[2].text.strip(),
        "title": cells[3].find("a", class_="booru-link").text.strip(),
        "url": cells[3].find("a")["href"],
        "icon": cells[3].find("img")["src"],
        "images": int(cells[4].text.strip()),
        "users": int(cells[5].text.strip()),
        "owner": cells[6].text.strip()
    }

def extract_tags(booru_url: str) -> Generator[Dict[str, Any], None, None]:
    """Extract tags from a booru.

    Args:
        booru_url (str): The URL of the booru to extract tags from.

    Yields:
        Dict[str, Any]: A dictionary containing the tag data.
    """
    pid = 0
    page_url = booru_url + '/index.php?page=tags&s=list'
    while True:
        soup = fetch_webpage(f"{page_url}&pid={pid}")
        logging.debug(soup)
        table = soup.find('table')
        if table is None:
            break
        rows = table.find_all('tr')
        for row in rows[1:]:
            cells = row.find_all('td')
            posts = int(cells[0].text.strip())
            name = cells[1].find('a').text.strip()
            tag_type = cells[2].text.split('(')[0].strip()
            yield {'name': name, 'booru_url': booru_url, 'posts': posts, 'type': tag_type}
        pid += 50

def extract_tags_from_posts(booru_url: str) -> Generator[Dict[str, Any], None, None]:
    """Extract tags from the posts of a booru.

    Args:
        booru_url (str): The URL of the booru to extract tags from.

    Yields:
        Dict[str, Any]: A dictionary containing the tag data.
    """
    pid = 0
    page_url = booru_url + '/index.php?/index.php?page=posts&s=list'
    while True:
        soup = fetch_webpage(f"{page_url}&pid={pid}")
        img_tag = soup.find_all('img', alt='post')
        if not img_tag:
            # Fail on cloudflare
            logging.debug(soup)
            break
        title = img_tag['title']
        tags = re.findall(r'\b\w+\b', title)
        for tag in tags:
            yield {'name': tag, 'booru_url': booru_url, 'posts': 0,'type': 'post'}
        pid += 40

def main() -> None:
    """The main function of the script.

    Args:
        None

    Returns:
        None
    """
    initialize_database()
    top_boorus = "https://booru.org/top"
    for booru in extract_boorus(top_boorus):
        found = False
        for tag in extract_tags(booru['url']):
            found = True
            if check_tag_exists(tag):
                continue
            insert_tag(tag)
        if found:
            continue

        for tag in extract_tags_from_posts(booru['url']):
            tag['booru_url'] = booru['url']
            if check_tag_exists(tag):
                continue
            insert_tag(tag)

if __name__ == "__main__":
    main()