rabuchaim / geoip2fast

GeoIP2Fast is the fastest GeoIP2 country/city/asn lookup library. A search takes less than 0.00003 seconds. It has its own data file updated twice a week with Maxmind-Geolite2-CSV and is Pure Python!
https://pypi.org/project/geoip2fast/
MIT License
34 stars 3 forks source link

No an issue but you can take a look. #9

Open MaurUppi opened 5 months ago

MaurUppi commented 5 months ago

A location_ipfire_db_reader library written by svaningelgem is able to read IPFires's location.db. Therefore, I used the function and leveraged another HTTP API as two references for cross-checking source data.

The source data to be checked is dbip-country-lite-2024-01.csv The references are IPFire's location.db and ip-api.com batch inquiry API. (100 IPs each request, 15s intervals )

Long story short, here is the result.

Total number of IPv4 records in the data source: 308422
Sampling size: 384
Processed 384/384 IPs
Accuracy (Cross-checked w/IPFire location.db): 60.677083333333336%
Processed 100/384 IPs
Processed 200/384 IPs
Processed 300/384 IPs
Processed 384/384 IPs
Accuracy (Cross-checked w/IP-API.com HTTP-API): 88.54166666666666%

Assuming two references have a high credit/confidence/accuracy level of CountryCode corresponding to CIDR. The initial conclusion could be:

  1. the IP-API.com data looks highly aligned with dbip-country db.
  2. in contrast, location.db only had 40% same records as dbip-country db.

so, one of the references is lier?

The Python code is below, If you have free moments, take a look to check if there are any logical faults.

Details

```python import math import time import pandas as pd import random import ipaddress import os from location_ipfire_db_reader import LocationDatabase from location_ipfire_db_reader.exceptions import IPAddressError import requests source_file_path = os.getenv('SOURCE_FILE_PATH', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\source\\dbip-country-lite-2024-01.csv') log_file_path_ipfire = os.getenv('LOG_FILE_PATH_IPFIRE', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IPFireDB.log') log_file_path_IP_API = os.getenv('LOG_file_path_IP_API', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IP-API.log') requests_per_minute = int(os.getenv('REQUESTS_PER_MINUTE', '15')) def is_ipv4(address): try: ipaddress.IPv4Address(address) return True except ipaddress.AddressValueError: return False def calculate_sample_size(total, confidence_level, margin_error, p=0.5): if not isinstance(total, int) or total <= 0: raise ValueError("Total must be a positive integer") if confidence_level not in {90, 95, 99}: raise ValueError("Confidence level must be 90, 95, or 99.") if not (isinstance(margin_error, float) or isinstance(margin_error, int)) or margin_error <= 0: raise ValueError("Margin error must be a positive number.") if not (isinstance(p, float) or isinstance(p, int)) or not (0 <= p <= 1): raise ValueError("Expected population proportion 'p' must be a number between 0 and 1.") z_dict = {90: 1.645, 95: 1.96, 99: 2.576} z = z_dict.get(confidence_level) if z is None: raise ValueError(f"The z-value corresponding to {confidence_level} was not found.") sample_size = ((z**2) * p * (1-p)) / (margin_error**2) sample_size = sample_size / (1 + ((sample_size - 1) / total)) # Return the sample size, rounded up to the nearest integer return math.ceil(sample_size) #return int(sample_size) if sample_size == int(sample_size) else int(sample_size) + 1 def calculate_accuracy(log_file_path_ipfire): match_count = 0 total_count = 0 with open(log_file_path_ipfire, 'r') as file: for line in file: total_count += 1 if ', O' in line: match_count += 1 accuracy = (match_count / total_count) * 100 if total_count > 0 else 0 return accuracy def ReadSourceCSVfile(csv_file_path): data = [] with open(csv_file_path, 'r') as file: for line in file: start_ip, end_ip, country = line.strip().split(',') if is_ipv4(start_ip) and is_ipv4(end_ip): row = { 'start_ip': start_ip, 'end_ip': end_ip, 'country': country, 'start_ip_int': int(ipaddress.IPv4Address(start_ip)), 'end_ip_int': int(ipaddress.IPv4Address(end_ip)) } data.append(row) df_ipv4 = pd.DataFrame(data) return df_ipv4 def generate_random_ips(df, sample_size, batch_size=100): all_ips = [] for _ in range(sample_size): random_row = df.sample().iloc[0] start_ip_int = random_row['start_ip_int'] end_ip_int = random_row['end_ip_int'] random_ip_int = random.randint(start_ip_int, end_ip_int) all_ips.append(str(ipaddress.IPv4Address(random_ip_int))) return all_ips def check_country_with_ipfire_db(df, ips, log_file_path_ipfire, db): match_count = 0 processed = 0 with open(log_file_path_ipfire, 'w') as log_file: for ip in ips: processed += 1 ip_int = int(ipaddress.IPv4Address(ip)) matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)] expected_country_code = matching_rows.iloc[0]['country'] if not matching_rows.empty else 'N/A' try: actual_country_code = db.find_country(ip) except IPAddressError: actual_country_code = 'N/A' match = 'O' if expected_country_code == actual_country_code else 'X' if match == 'O': match_count += 1 result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}" log_file.write(result_line + '\n') log_file.flush() print(f"Processed {processed}/{len(ips)} IPs") return match_count, processed def check_country_with_ip_api(df, ips, log_file_path, batch_size=100): match_count = 0 processed = 0 with open(log_file_path, 'w') as log_file: for batch_start in range(0, len(ips), batch_size): batch_ips = ips[batch_start:batch_start + batch_size] response = requests.post('http://ip-api.com/batch?fields=status,countryCode,query', json=batch_ips) if response.status_code != 200: print(f"Request failed with status code: {response.status_code}") continue rate_limit_remaining = response.headers.get('X-Rl') rate_limit_reset = response.headers.get('X-Ttl') if rate_limit_remaining is not None: rate_limit_remaining = int(rate_limit_remaining) else: print("Warning: 'X-Rl' header is missing. Defaulting to 0.") rate_limit_remaining = 0 if rate_limit_reset is not None: rate_limit_reset = int(rate_limit_reset) else: print("Warning: 'X-Ttl' header is missing. Defaulting to 60.") rate_limit_reset = 60 if rate_limit_remaining == 0: time.sleep(rate_limit_reset) else: # Calculate the delay needed to not exceed 15 requests per minute delay = 60 / requests_per_minute time.sleep(delay) try: batch_results = response.json() except ValueError as e: print(f"Error parsing JSON response: {e}") continue for ip, res in zip(batch_ips, batch_results): processed += 1 if res['status'] == 'success': ip_int = int(ipaddress.IPv4Address(ip)) matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)] if not matching_rows.empty: expected_country_code = matching_rows.iloc[0]['country'] else: expected_country_code = 'N/A' actual_country_code = res.get('countryCode', 'N/A') match = 'O' if expected_country_code == actual_country_code else 'X' if match == 'O': match_count += 1 else: expected_country_code = 'N/A' actual_country_code = 'N/A' match = 'X' result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}" log_file.write(result_line + '\n') log_file.flush() print(f"Processed {processed}/{len(ips)} IPs") return match_count, processed def main(): # Read the CSV file and calculate the sample size df_ipv4 = ReadSourceCSVfile(source_file_path) total_ipv4_rows = len(df_ipv4) print(f"Total number of IPv4 records in the data source: {total_ipv4_rows}") confidence_level = 95 margin_error = 0.05 sample_size = calculate_sample_size(total_ipv4_rows, confidence_level, margin_error) print(f"Sampling size: {sample_size}") random_ips = generate_random_ips(df_ipv4, sample_size) # Initialize LocationDatabase instance db = LocationDatabase('location.db') # Check countries with IPFire DB check_country_with_ipfire_db(df_ipv4, random_ips, log_file_path_ipfire, db) accuracy_ipfire = calculate_accuracy(log_file_path_ipfire) print(f"Accuracy (Cross-checked w/IPFire location.db): {accuracy_ipfire}%") # Check countries with IP-API check_country_with_ip_api(df_ipv4, random_ips, log_file_path_IP_API) accuracy_ip_api = calculate_accuracy(log_file_path_IP_API) print(f"Accuracy (Cross-checked w/IP-API.com HTTP-API): {accuracy_ip_api}%") if __name__ == "__main__": main() ```

The log output files.

random_pickIPaddr_checkwith_IPFireDB.log random_pickIPaddr_checkwith_IP-API.log

Hope I'm not flooding the thread.

I had another source data file that merged from ( ipinfo's country.csv, IP2LOCATION-LITE-DB1.CSV, dbip-country-lite-2024-01.csv) GeoIP CSV data files. What logic to merge is different stories and the result has yet to meet my expectations.

The key point is Accuracy results differ from the previous.

python.exe .\random_pickup_IPaddr_CheckTwoRefSource.py

Total number of IPv4 records in the data source: 630324
Sampling size: 384
Processed 384/384 IPs
Accuracy (Cross-checked w/IPFire location.db): 77.60416666666666%
Processed 100/384 IPs
Processed 200/384 IPs
Processed 300/384 IPs
Processed 384/384 IPs
Accuracy (Cross-checked w/IP-API.com HTTP-API): 80.20833333333334%

merged_ip_data.zip