phileasme / ProxyFilter

Validates Untested Proxy Addresses
MIT License
0 stars 0 forks source link

Invalid hits still recorded by provider as false positive netscan / scansnarf-ng. #1

Open phileasme opened 2 months ago

phileasme commented 2 months ago

While we aren't testing in masses, we are still hitting cloudflare or unrouted addresses directly. A strategy that can be employed is as the following: As soon as we have one trusted proxy we can tunnel our request through it. That is url_to_test->main_proxy(trusted)->secondary(untested)->response

phileasme commented 2 months ago

Proposed by our dear Claude:

import ipaddress
import requests
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from typing import List, Dict, Set, Tuple

logger = logging.getLogger(__name__)

class TunnelProxyRoutability:
    def __init__(self, proxy1: str, proxy2: str):
        self.proxy1 = proxy1
        self.proxy2 = proxy2
        self.routable_addresses: Set[str] = set()
        self.invalid_addresses: Set[str] = set()
        self.cloudflare_addresses: Set[str] = set()

    def is_valid_ip(ip: str) -> bool:
            return True
        except ValueError:
            return False

    def is_private_ip(ip: str) -> bool:
        return ipaddress.ip_address(ip).is_private

    def make_tunneled_request(self, url: str, timeout: int = 10) -> Tuple[bool, requests.Response]:
        proxies = {
            'http': self.proxy1,
            'https': self.proxy1,
            with requests.Session() as session:
                session.proxies = proxies
                response = session.get(
                    headers={'Proxy-Authorization': f'Basic {self.proxy2}'},
            return True, response
        except requests.RequestException as e:
            logger.error(f"Error making tunneled request to {url}: {str(e)}")
            return False, None

    def is_cloudflare(self, ip: str) -> bool:
        success, response = self.make_tunneled_request(f"http://{ip}")
        if success and response:
            return 'cloudflare' in response.headers.get('Server', '').lower()
        return False

    def is_routable(self, ip: str) -> bool:
        if not self.is_valid_ip(ip) or self.is_private_ip(ip):
            return False

        if self.is_cloudflare(ip):
            logger.debug(f"{ip} is associated with Cloudflare")
            return False

        success, response = self.make_tunneled_request(f"http://{ip}")
        return success and response and response.status_code == 200

    def validate_ip_list(self, ip_list: List[str], max_workers: int = 50) -> Dict[str, bool]:
        results = {}
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(self.is_routable, ip): ip for ip in ip_list}

            with tqdm(total=len(ip_list), desc="Validating IPs") as pbar:
                for future in as_completed(futures):
                    ip = futures[future]
                        is_routable = future.result()
                        results[ip] = is_routable
                        if is_routable:
                        elif ip not in self.cloudflare_addresses:
                    except Exception as exc:
                        logger.error(f"{ip} generated an exception: {exc}")
                        results[ip] = False

        return results

# Example usage
if __name__ == "__main__":
    proxy1 = ""
    proxy2 = ""
    routability = TunnelProxyRoutability(proxy1, proxy2)
    test_ips = ["", "", ""]
    results = routability.validate_ip_list(test_ips)
    print("Results:", results)
    print("Routable:", routability.routable_addresses)
    print("Invalid:", routability.invalid_addresses)
    print("Cloudflare:", routability.cloudflare_addresses)```