Open fabriziosalmi opened 11 months ago
required: ip_addresses.txt (combined and sanitized from selected sources) output: ip2fqdn.txt, dns_errors.txt
ip2fqdn.yml
name: DNS Resolution
on:
push:
branches:
- main
jobs:
resolve_dns:
runs-on: ubuntu-latest
steps:
- name: Checkout Repository
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install Dependencies
run: pip install asyncio
- name: Run DNS Resolution Script
run: python dns_resolver.py
dns_resolver.py
import socket
import asyncio
async def resolve_ip_to_fqdn(ip, dns_servers, timeout=5):
results = []
for dns_server in dns_servers:
try:
# Create a DNS resolver with a custom timeout
resolver = socket.getaddrinfo(ip, None, socket.AF_INET, socket.SOCK_STREAM)
resolver.settimeout(timeout)
# Perform DNS resolution using the current DNS server
_, _, _, _, (ip_addr, _) = await asyncio.to_thread(resolver)
results.append(ip_addr)
except (socket.herror, socket.gaierror, socket.timeout):
# Handle DNS resolution errors (e.g., timeout or no resolution)
results.append("Unable to resolve")
return ip, results
async def main():
# Read IP addresses from ip_addresses.txt
with open("ip_addresses.txt", "r") as ip_file:
ip_addresses = [line.strip() for line in ip_file]
# Read DNS servers from dns_servers.txt
with open("dns_servers.txt", "r") as dns_file:
dns_servers = [line.strip() for line in dns_file]
# Output filenames
ip2fqdn_filename = "ip2fqdn.txt"
error_log_filename = "dns_errors.txt"
# Customizable timeout for DNS queries (in seconds)
timeout = 5
# Perform DNS resolution in parallel using asyncio
async with asyncio.ThreadPoolExecutor(max_workers=len(ip_addresses)) as executor:
tasks = [resolve_ip_to_fqdn(ip, dns_servers, timeout) for ip in ip_addresses]
results = await asyncio.gather(*tasks)
# Write the results to ip2fqdn.txt
with open(ip2fqdn_filename, "w") as output_file:
for ip, fqdn_results in results:
output_file.write(f"IP: {ip}\n")
for i, ip_addr in enumerate(fqdn_results):
output_file.write(f" DNS Server {i+1}: {ip_addr}\n")
# Write DNS resolution errors to dns_errors.txt
with open(error_log_filename, "w") as error_file:
for ip, fqdn_results in results:
if "Unable to resolve" in fqdn_results:
error_file.write(f"Error resolving IP: {ip}\n")
if __name__ == "__main__":
asyncio.run(main())
Here’s an enhanced version of the script with progress reporting and further improvements:
import asyncio
import concurrent.futures
import dns.resolver
import dns.reversename
import logging
from tqdm import tqdm
# Setup logging
logging.basicConfig(filename='resolved_ips.log', level=logging.INFO)
def load_from_file(filename):
"""Load lines from a file and return them as a list."""
try:
with open(filename, 'r') as file:
return [line.strip() for line in file if line.strip()]
except FileNotFoundError:
logging.error(f"{filename} not found.")
return []
async def resolve_ip(ip, dns_server):
"""Resolve IP using the provided DNS server and log the result."""
loop = asyncio.get_running_loop()
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = timeout
resolver.lifetime = timeout
try:
rev_name = dns.reversename.from_address(ip)
with concurrent.futures.ThreadPoolExecutor() as executor:
resolved = await loop.run_in_executor(executor, lambda: str(resolver.resolve(rev_name, "PTR")[0]))
logging.info(f"{ip} resolves to {resolved} using DNS server {dns_server}")
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.Timeout):
# Ignore IPs with no resolution or if a timeout occurs
pass
async def main():
# Load IP addresses and DNS servers from files
ip_addresses = load_from_file('ips.txt')
dns_servers = load_from_file('dns_servers.txt')
if not ip_addresses or not dns_servers:
logging.error("IP addresses or DNS servers are missing.")
return
# Divide IPs among DNS servers
ips_per_server = len(ip_addresses) // len(dns_servers)
if len(ip_addresses) % len(dns_servers) != 0:
ips_per_server += 1
tasks = []
for i in range(len(dns_servers)):
start_index = i * ips_per_server
end_index = start_index + ips_per_server
for ip in ip_addresses[start_index:end_index]:
tasks.append(resolve_ip(ip, dns_servers[i]))
# Run tasks with progress bar
for _ in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc='Resolving IPs'):
pass
# Set the timeout for DNS queries
timeout = 1.0
# Run the main coroutine
asyncio.run(main())
tqdm
library, if not already installed:
pip install tqdm
ips.txt
and dns_servers.txt
files.python script_name.py
Replace script_name.py
with the name you give to the Python script file.
tqdm
library.resolved_ips.log
.