Open StellaYiwan opened 3 months ago
import time import json import random import platform from datetime import datetime import winsound import requests from selenium import webdriver from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait as Wait from selenium.webdriver.common.by import By import numpy as np import pandas as pd from IPython.display import clear_output SIGN_IN = "https://ais.usvisa-info.com/en-ca/niv/users/sign_in" XPATH = { "sign_in_page": "//h2[contains(text(), 'Sign In')]", "continue_page": "//a[contains(text(),'Continue')]", "pay_fee_page": "//h5[contains(text(), 'Pay Visa Fee')]", "time_collect_page": "//h2[contains(text(), 'MRV Fee Details')]", "maintenance_page": "//h1[contains(text(), 'Doing Maintenance')]", } CHROME_DRIVER = "" TEST_CREDENTIAL = { "TEST_USERNAME_1": "PROD_PASSWORD_1", "TEST_USERNAME_2": "PROD_PASSWORD_2", } PROD_CREDENTIAL = { "PROD_USERNAME_1": "PROD_PASSWORD_1", "PROD_USERNAME_2": "PROD_PASSWORD_2", } XPATH = { "sign_in_page": "//h2[contains(text(), 'Sign In')]", "continue_page": "//a[contains(text(),'Continue')]", "pay_fee_page": "//h3[contains(text(), 'Take action on this group of applicants')]", "time_collect_page": "//h2[contains(text(), 'MRV Fee Details')]", "maintenance_page": "//h1[contains(text(), 'Doing Maintenance')]", } ALERT_THRESHOLD = pd.Timedelta(90, "d") def is_status(driver, xpath): try: element = driver.find_element_by_xpath(xpath) if element is not None: return True return False except: return False def get_page_status(driver): for status, search in XPATH.items(): if is_status(driver, search): return status return None def load_ready(xpath, option="locate", wait_timeout=60): try: if option == "locate": element_present = EC.presence_of_element_located((By.XPATH, xpath)) elif option == "clickable": element_present = EC.element_to_be_clickable((By.XPATH, xpath)) _ = Wait(driver, wait_timeout).until(element_present) return True except: return False def go_to_login_page(driver, *args): driver.get(SIGN_IN) return load_ready(XPATH["sign_in_page"], "locate", 30) def go_to_continue_page(driver, username, password): driver.find_element_by_id('user_email').send_keys(username) sleep_rand(1, 3) driver.find_element_by_id('user_password').send_keys(password) sleep_rand(1, 3) driver.find_element_by_class_name('icheckbox').click() sleep_rand(1, 3) driver.find_element_by_name('commit').click() return load_ready(XPATH["continue_page"], "locate", 20) def go_to_pay_fee_page(driver, *args): driver.find_element_by_xpath(XPATH["continue_page"]).click() return load_ready(XPATH["pay_fee_page"], "clickable", 20) def go_to_time_collect_page(driver, *args): driver.find_element_by_tag_name('h5').click() sleep_rand(1, 2) driver.find_element_by_xpath("//a[contains(text(),'Pay Visa Fee')]").click() sleep_rand(1, 2) return load_ready(XPATH["time_collect_page"], "locate", 20) def collect_to_working_data(driver, *args): working_data.append(collect_time(driver)) last_collect = working_data[-1] timestamp_string = f"collected at {last_collect['date'] + last_collect['time']}\n" collect_time_string = "".join([f"{k} {v.date()}\n" for k, v in last_collect.items() if k not in ["date", "time"]]) info_string = timestamp_string + collect_time_string if min([v for k, v in last_collect.items() if k not in ["date", "time"]]) - last_collect["date"] <= ALERT_THRESHOLD: alert_time = int(np.random.uniform(30, 60)) for i in range(alert_time): print(f"\x1b[41m{info_string}\x1b[0m") # time.sleep(0.5) winsound.Beep(2500, 500) clear_output(wait=True) print(info_string) time.sleep(0.5) clear_output(wait=True) else: clear_output(wait=True) print(info_string) # sleep_rand(45, 75) sleep_rand(90, 150) driver.refresh() return load_ready(XPATH["time_collect_page"], "locate", 20) def refresh(driver, *args): sleep_rand(45, 75) driver.refresh() return True ACTION = { "start": go_to_login_page, "sign_in_page": go_to_continue_page, "continue_page": go_to_pay_fee_page, "pay_fee_page": go_to_time_collect_page, "time_collect_page": collect_to_working_data, "maintenance_page": refresh, } def sleep_rand(low, high): sleep_sec = np.random.uniform(low, high) time.sleep(sleep_sec) def stamp_time(result): now = pd.Timestamp.now() current_date = now.floor("d") current_time = (now - current_date).round("s") result["date"] = current_date result["time"] = current_time return result def collect_time(driver): result = {} time_table = driver.find_element_by_class_name('for-layout') trs = time_table.find_elements_by_tag_name('tr') for tr in trs: tds = tr.find_elements_by_tag_name('td') if len(tds) != 2: print(t.text for t in tds) place = tds[0].text date_str = tds[1].text if date_str == "No Appointments Available": date = pd.Timestamp(None) else: date = pd.Timestamp(date_str) result[place] = date return stamp_time(result) init = True if init: # working_data = [] driver = webdriver.Chrome(CHROME_DRIVER) current_status = "start" else: current_status = get_page_status(driver) while True: print(f"current page status: {current_status}, execute action: {ACTION[current_status].__name__}") if current_status is None: raise ValueError(f"Unknown status None") success = ACTION[current_status](driver, *get_test_credential()) print(f"action status: {'success' if success else 'fail'}") if success: current_status = get_page_status(driver) else: current_status = "maintenance_page"