ultrafunkamsterdam / undetected-chromedriver

Custom Selenium Chromedriver | Zero-Config | Passes ALL bot mitigation systems (like Distil / Imperva/ Datadadome / CloudFlare IUAM)
https://github.com/UltrafunkAmsterdam/undetected-chromedriver
GNU General Public License v3.0
9.95k stars 1.16k forks source link

[NODRIVER] Port pypeter input #2019

Open boludoz opened 1 month ago

boludoz commented 1 month ago

add to utils: from typing import Optional, List, Set, Union, Callable, Dict

"""Keyboard and Mouse module."""

"""US Keyboard Definition."""

key_definitions = {
    '0': {'keyCode': 48, 'key': '0', 'code': 'Digit0'},
    '1': {'keyCode': 49, 'key': '1', 'code': 'Digit1'},
    '2': {'keyCode': 50, 'key': '2', 'code': 'Digit2'},
    '3': {'keyCode': 51, 'key': '3', 'code': 'Digit3'},
    '4': {'keyCode': 52, 'key': '4', 'code': 'Digit4'},
    '5': {'keyCode': 53, 'key': '5', 'code': 'Digit5'},
    '6': {'keyCode': 54, 'key': '6', 'code': 'Digit6'},
    '7': {'keyCode': 55, 'key': '7', 'code': 'Digit7'},
    '8': {'keyCode': 56, 'key': '8', 'code': 'Digit8'},
    '9': {'keyCode': 57, 'key': '9', 'code': 'Digit9'},
    'Power': {'key': 'Power', 'code': 'Power'},
    'Eject': {'key': 'Eject', 'code': 'Eject'},
    'Abort': {'keyCode': 3, 'code': 'Abort', 'key': 'Cancel'},
    'Help': {'keyCode': 6, 'code': 'Help', 'key': 'Help'},
    'Backspace': {'keyCode': 8, 'code': 'Backspace', 'key': 'Backspace'},
    'Tab': {'keyCode': 9, 'code': 'Tab', 'key': 'Tab'},
    'Numpad5': {'keyCode': 12, 'shiftKeyCode': 101, 'key': 'Clear', 'code': 'Numpad5', 'shiftKey': '5', 'location': 3},
    'NumpadEnter': {'keyCode': 13, 'code': 'NumpadEnter', 'key': 'Enter', 'text': '\r', 'location': 3},
    'Enter': {'keyCode': 13, 'code': 'Enter', 'key': 'Enter', 'text': '\r'},
    '\r': {'keyCode': 13, 'code': 'Enter', 'key': 'Enter', 'text': '\r'},
    '\n': {'keyCode': 13, 'code': 'Enter', 'key': 'Enter', 'text': '\r'},
    'ShiftLeft': {'keyCode': 16, 'code': 'ShiftLeft', 'key': 'Shift', 'location': 1},
    'ShiftRight': {'keyCode': 16, 'code': 'ShiftRight', 'key': 'Shift', 'location': 2},
    'ControlLeft': {'keyCode': 17, 'code': 'ControlLeft', 'key': 'Control', 'location': 1},
    'ControlRight': {'keyCode': 17, 'code': 'ControlRight', 'key': 'Control', 'location': 2},
    'AltLeft': {'keyCode': 18, 'code': 'AltLeft', 'key': 'Alt', 'location': 1},
    'AltRight': {'keyCode': 18, 'code': 'AltRight', 'key': 'Alt', 'location': 2},
    'Pause': {'keyCode': 19, 'code': 'Pause', 'key': 'Pause'},
    'CapsLock': {'keyCode': 20, 'code': 'CapsLock', 'key': 'CapsLock'},
    'Escape': {'keyCode': 27, 'code': 'Escape', 'key': 'Escape'},
    'Convert': {'keyCode': 28, 'code': 'Convert', 'key': 'Convert'},
    'NonConvert': {'keyCode': 29, 'code': 'NonConvert', 'key': 'NonConvert'},
    'Space': {'keyCode': 32, 'code': 'Space', 'key': ' '},
    'Numpad9': {'keyCode': 33, 'shiftKeyCode': 105, 'key': 'PageUp', 'code': 'Numpad9', 'shiftKey': '9', 'location': 3},
    'PageUp': {'keyCode': 33, 'code': 'PageUp', 'key': 'PageUp'},
    'Numpad3': {'keyCode': 34, 'shiftKeyCode': 99, 'key': 'PageDown', 'code': 'Numpad3', 'shiftKey': '3', 'location': 3},
    'PageDown': {'keyCode': 34, 'code': 'PageDown', 'key': 'PageDown'},
    'End': {'keyCode': 35, 'code': 'End', 'key': 'End'},
    'Numpad1': {'keyCode': 35, 'shiftKeyCode': 97, 'key': 'End', 'code': 'Numpad1', 'shiftKey': '1', 'location': 3},
    'Home': {'keyCode': 36, 'code': 'Home', 'key': 'Home'},
    'Numpad7': {'keyCode': 36, 'shiftKeyCode': 103, 'key': 'Home', 'code': 'Numpad7', 'shiftKey': '7', 'location': 3},
    'ArrowLeft': {'keyCode': 37, 'code': 'ArrowLeft', 'key': 'ArrowLeft'},
    'Numpad4': {'keyCode': 37, 'shiftKeyCode': 100, 'key': 'ArrowLeft', 'code': 'Numpad4', 'shiftKey': '4', 'location': 3},
    'Numpad8': {'keyCode': 38, 'shiftKeyCode': 104, 'key': 'ArrowUp', 'code': 'Numpad8', 'shiftKey': '8', 'location': 3},
    'ArrowUp': {'keyCode': 38, 'code': 'ArrowUp', 'key': 'ArrowUp'},
    'ArrowRight': {'keyCode': 39, 'code': 'ArrowRight', 'key': 'ArrowRight'},
    'Numpad6': {'keyCode': 39, 'shiftKeyCode': 102, 'key': 'ArrowRight', 'code': 'Numpad6', 'shiftKey': '6', 'location': 3},
    'Numpad2': {'keyCode': 40, 'shiftKeyCode': 98, 'key': 'ArrowDown', 'code': 'Numpad2', 'shiftKey': '2', 'location': 3},
    'ArrowDown': {'keyCode': 40, 'code': 'ArrowDown', 'key': 'ArrowDown'},
    'Select': {'keyCode': 41, 'code': 'Select', 'key': 'Select'},
    'Open': {'keyCode': 43, 'code': 'Open', 'key': 'Execute'},
    'PrintScreen': {'keyCode': 44, 'code': 'PrintScreen', 'key': 'PrintScreen'},
    'Insert': {'keyCode': 45, 'code': 'Insert', 'key': 'Insert'},
    'Numpad0': {'keyCode': 45, 'shiftKeyCode': 96, 'key': 'Insert', 'code': 'Numpad0', 'shiftKey': '0', 'location': 3},
    'Delete': {'keyCode': 46, 'code': 'Delete', 'key': 'Delete'},
    'NumpadDecimal': {'keyCode': 46, 'shiftKeyCode': 110, 'code': 'NumpadDecimal', 'key': '\u0000', 'shiftKey': '.', 'location': 3},
    'Digit0': {'keyCode': 48, 'code': 'Digit0', 'shiftKey': ')', 'key': '0'},
    'Digit1': {'keyCode': 49, 'code': 'Digit1', 'shiftKey': '!', 'key': '1'},
    'Digit2': {'keyCode': 50, 'code': 'Digit2', 'shiftKey': '@', 'key': '2'},
    'Digit3': {'keyCode': 51, 'code': 'Digit3', 'shiftKey': '#', 'key': '3'},
    'Digit4': {'keyCode': 52, 'code': 'Digit4', 'shiftKey': '$', 'key': '4'},
    'Digit5': {'keyCode': 53, 'code': 'Digit5', 'shiftKey': '%', 'key': '5'},
    'Digit6': {'keyCode': 54, 'code': 'Digit6', 'shiftKey': '^', 'key': '6'},
    'Digit7': {'keyCode': 55, 'code': 'Digit7', 'shiftKey': '&', 'key': '7'},
    'Digit8': {'keyCode': 56, 'code': 'Digit8', 'shiftKey': '*', 'key': '8'},
    'Digit9': {'keyCode': 57, 'code': 'Digit9', 'shiftKey': '(', 'key': '9'},
    'KeyA': {'keyCode': 65, 'code': 'KeyA', 'shiftKey': 'A', 'key': 'a'},
    'KeyB': {'keyCode': 66, 'code': 'KeyB', 'shiftKey': 'B', 'key': 'b'},
    'KeyC': {'keyCode': 67, 'code': 'KeyC', 'shiftKey': 'C', 'key': 'c'},
    'KeyD': {'keyCode': 68, 'code': 'KeyD', 'shiftKey': 'D', 'key': 'd'},
    'KeyE': {'keyCode': 69, 'code': 'KeyE', 'shiftKey': 'E', 'key': 'e'},
    'KeyF': {'keyCode': 70, 'code': 'KeyF', 'shiftKey': 'F', 'key': 'f'},
    'KeyG': {'keyCode': 71, 'code': 'KeyG', 'shiftKey': 'G', 'key': 'g'},
    'KeyH': {'keyCode': 72, 'code': 'KeyH', 'shiftKey': 'H', 'key': 'h'},
    'KeyI': {'keyCode': 73, 'code': 'KeyI', 'shiftKey': 'I', 'key': 'i'},
    'KeyJ': {'keyCode': 74, 'code': 'KeyJ', 'shiftKey': 'J', 'key': 'j'},
    'KeyK': {'keyCode': 75, 'code': 'KeyK', 'shiftKey': 'K', 'key': 'k'},
    'KeyL': {'keyCode': 76, 'code': 'KeyL', 'shiftKey': 'L', 'key': 'l'},
    'KeyM': {'keyCode': 77, 'code': 'KeyM', 'shiftKey': 'M', 'key': 'm'},
    'KeyN': {'keyCode': 78, 'code': 'KeyN', 'shiftKey': 'N', 'key': 'n'},
    'KeyO': {'keyCode': 79, 'code': 'KeyO', 'shiftKey': 'O', 'key': 'o'},
    'KeyP': {'keyCode': 80, 'code': 'KeyP', 'shiftKey': 'P', 'key': 'p'},
    'KeyQ': {'keyCode': 81, 'code': 'KeyQ', 'shiftKey': 'Q', 'key': 'q'},
    'KeyR': {'keyCode': 82, 'code': 'KeyR', 'shiftKey': 'R', 'key': 'r'},
    'KeyS': {'keyCode': 83, 'code': 'KeyS', 'shiftKey': 'S', 'key': 's'},
    'KeyT': {'keyCode': 84, 'code': 'KeyT', 'shiftKey': 'T', 'key': 't'},
    'KeyU': {'keyCode': 85, 'code': 'KeyU', 'shiftKey': 'U', 'key': 'u'},
    'KeyV': {'keyCode': 86, 'code': 'KeyV', 'shiftKey': 'V', 'key': 'v'},
    'KeyW': {'keyCode': 87, 'code': 'KeyW', 'shiftKey': 'W', 'key': 'w'},
    'KeyX': {'keyCode': 88, 'code': 'KeyX', 'shiftKey': 'X', 'key': 'x'},
    'KeyY': {'keyCode': 89, 'code': 'KeyY', 'shiftKey': 'Y', 'key': 'y'},
    'KeyZ': {'keyCode': 90, 'code': 'KeyZ', 'shiftKey': 'Z', 'key': 'z'},
    'MetaLeft': {'keyCode': 91, 'code': 'MetaLeft', 'key': 'Meta'},
    'MetaRight': {'keyCode': 92, 'code': 'MetaRight', 'key': 'Meta'},
    'ContextMenu': {'keyCode': 93, 'code': 'ContextMenu', 'key': 'ContextMenu'},
    'NumpadMultiply': {'keyCode': 106, 'code': 'NumpadMultiply', 'key': '*', 'location': 3},
    'NumpadAdd': {'keyCode': 107, 'code': 'NumpadAdd', 'key': '+', 'location': 3},
    'NumpadSubtract': {'keyCode': 109, 'code': 'NumpadSubtract', 'key': '-', 'location': 3},
    'NumpadDivide': {'keyCode': 111, 'code': 'NumpadDivide', 'key': '/', 'location': 3},
    'F1': {'keyCode': 112, 'code': 'F1', 'key': 'F1'},
    'F2': {'keyCode': 113, 'code': 'F2', 'key': 'F2'},
    'F3': {'keyCode': 114, 'code': 'F3', 'key': 'F3'},
    'F4': {'keyCode': 115, 'code': 'F4', 'key': 'F4'},
    'F5': {'keyCode': 116, 'code': 'F5', 'key': 'F5'},
    'F6': {'keyCode': 117, 'code': 'F6', 'key': 'F6'},
    'F7': {'keyCode': 118, 'code': 'F7', 'key': 'F7'},
    'F8': {'keyCode': 119, 'code': 'F8', 'key': 'F8'},
    'F9': {'keyCode': 120, 'code': 'F9', 'key': 'F9'},
    'F10': {'keyCode': 121, 'code': 'F10', 'key': 'F10'},
    'F11': {'keyCode': 122, 'code': 'F11', 'key': 'F11'},
    'F12': {'keyCode': 123, 'code': 'F12', 'key': 'F12'},
    'F13': {'keyCode': 124, 'code': 'F13', 'key': 'F13'},
    'F14': {'keyCode': 125, 'code': 'F14', 'key': 'F14'},
    'F15': {'keyCode': 126, 'code': 'F15', 'key': 'F15'},
    'F16': {'keyCode': 127, 'code': 'F16', 'key': 'F16'},
    'F17': {'keyCode': 128, 'code': 'F17', 'key': 'F17'},
    'F18': {'keyCode': 129, 'code': 'F18', 'key': 'F18'},
    'F19': {'keyCode': 130, 'code': 'F19', 'key': 'F19'},
    'F20': {'keyCode': 131, 'code': 'F20', 'key': 'F20'},
    'F21': {'keyCode': 132, 'code': 'F21', 'key': 'F21'},
    'F22': {'keyCode': 133, 'code': 'F22', 'key': 'F22'},
    'F23': {'keyCode': 134, 'code': 'F23', 'key': 'F23'},
    'F24': {'keyCode': 135, 'code': 'F24', 'key': 'F24'},
    'NumLock': {'keyCode': 144, 'code': 'NumLock', 'key': 'NumLock'},
    'ScrollLock': {'keyCode': 145, 'code': 'ScrollLock', 'key': 'ScrollLock'},
    'AudioVolumeMute': {'keyCode': 173, 'code': 'AudioVolumeMute', 'key': 'AudioVolumeMute'},
    'AudioVolumeDown': {'keyCode': 174, 'code': 'AudioVolumeDown', 'key': 'AudioVolumeDown'},
    'AudioVolumeUp': {'keyCode': 175, 'code': 'AudioVolumeUp', 'key': 'AudioVolumeUp'},
    'MediaTrackNext': {'keyCode': 176, 'code': 'MediaTrackNext', 'key': 'MediaTrackNext'},
    'MediaTrackPrevious': {'keyCode': 177, 'code': 'MediaTrackPrevious', 'key': 'MediaTrackPrevious'},
    'MediaStop': {'keyCode': 178, 'code': 'MediaStop', 'key': 'MediaStop'},
    'MediaPlayPause': {'keyCode': 179, 'code': 'MediaPlayPause', 'key': 'MediaPlayPause'},
    'Semicolon': {'keyCode': 186, 'code': 'Semicolon', 'shiftKey': ':', 'key': ';'},
    'Equal': {'keyCode': 187, 'code': 'Equal', 'shiftKey': '+', 'key': '='},
    'NumpadEqual': {'keyCode': 187, 'code': 'NumpadEqual', 'key': '=', 'location': 3},
    'Comma': {'keyCode': 188, 'code': 'Comma', 'shiftKey': '<', 'key': ','},
    'Minus': {'keyCode': 189, 'code': 'Minus', 'shiftKey': '_', 'key': '-'},
    'Period': {'keyCode': 190, 'code': 'Period', 'shiftKey': '>', 'key': '.'},
    'Slash': {'keyCode': 191, 'code': 'Slash', 'shiftKey': '?', 'key': '/'},
    'Backquote': {'keyCode': 192, 'code': 'Backquote', 'shiftKey': '~', 'key': '`'},
    'BracketLeft': {'keyCode': 219, 'code': 'BracketLeft', 'shiftKey': '{', 'key': '['},
    'Backslash': {'keyCode': 220, 'code': 'Backslash', 'shiftKey': '|', 'key': '\\'},
    'BracketRight': {'keyCode': 221, 'code': 'BracketRight', 'shiftKey': '}', 'key': ']'},
    'Quote': {'keyCode': 222, 'code': 'Quote', 'shiftKey': '"', 'key': '\''},
    'AltGraph': {'keyCode': 225, 'code': 'AltGraph', 'key': 'AltGraph'},
    'Props': {'keyCode': 247, 'code': 'Props', 'key': 'CrSel'},
    'Cancel': {'keyCode': 3, 'key': 'Cancel', 'code': 'Abort'},
    'Clear': {'keyCode': 12, 'key': 'Clear', 'code': 'Numpad5', 'location': 3},
    'Shift': {'keyCode': 16, 'key': 'Shift', 'code': 'ShiftLeft'},
    'Control': {'keyCode': 17, 'key': 'Control', 'code': 'ControlLeft'},
    'Alt': {'keyCode': 18, 'key': 'Alt', 'code': 'AltLeft'},
    'Accept': {'keyCode': 30, 'key': 'Accept'},
    'ModeChange': {'keyCode': 31, 'key': 'ModeChange'},
    ' ': {'keyCode': 32, 'key': ' ', 'code': 'Space'},
    'Print': {'keyCode': 42, 'key': 'Print'},
    'Execute': {'keyCode': 43, 'key': 'Execute', 'code': 'Open'},
    '\u0000': {'keyCode': 46, 'key': '\u0000', 'code': 'NumpadDecimal', 'location': 3},
    'a': {'keyCode': 65, 'key': 'a', 'code': 'KeyA'},
    'b': {'keyCode': 66, 'key': 'b', 'code': 'KeyB'},
    'c': {'keyCode': 67, 'key': 'c', 'code': 'KeyC'},
    'd': {'keyCode': 68, 'key': 'd', 'code': 'KeyD'},
    'e': {'keyCode': 69, 'key': 'e', 'code': 'KeyE'},
    'f': {'keyCode': 70, 'key': 'f', 'code': 'KeyF'},
    'g': {'keyCode': 71, 'key': 'g', 'code': 'KeyG'},
    'h': {'keyCode': 72, 'key': 'h', 'code': 'KeyH'},
    'i': {'keyCode': 73, 'key': 'i', 'code': 'KeyI'},
    'j': {'keyCode': 74, 'key': 'j', 'code': 'KeyJ'},
    'k': {'keyCode': 75, 'key': 'k', 'code': 'KeyK'},
    'l': {'keyCode': 76, 'key': 'l', 'code': 'KeyL'},
    'm': {'keyCode': 77, 'key': 'm', 'code': 'KeyM'},
    'n': {'keyCode': 78, 'key': 'n', 'code': 'KeyN'},
    'o': {'keyCode': 79, 'key': 'o', 'code': 'KeyO'},
    'p': {'keyCode': 80, 'key': 'p', 'code': 'KeyP'},
    'q': {'keyCode': 81, 'key': 'q', 'code': 'KeyQ'},
    'r': {'keyCode': 82, 'key': 'r', 'code': 'KeyR'},
    's': {'keyCode': 83, 'key': 's', 'code': 'KeyS'},
    't': {'keyCode': 84, 'key': 't', 'code': 'KeyT'},
    'u': {'keyCode': 85, 'key': 'u', 'code': 'KeyU'},
    'v': {'keyCode': 86, 'key': 'v', 'code': 'KeyV'},
    'w': {'keyCode': 87, 'key': 'w', 'code': 'KeyW'},
    'x': {'keyCode': 88, 'key': 'x', 'code': 'KeyX'},
    'y': {'keyCode': 89, 'key': 'y', 'code': 'KeyY'},
    'z': {'keyCode': 90, 'key': 'z', 'code': 'KeyZ'},
    'Meta': {'keyCode': 91, 'key': 'Meta', 'code': 'MetaLeft'},
    '*': {'keyCode': 106, 'key': '*', 'code': 'NumpadMultiply', 'location': 3},
    '+': {'keyCode': 107, 'key': '+', 'code': 'NumpadAdd', 'location': 3},
    '-': {'keyCode': 109, 'key': '-', 'code': 'NumpadSubtract', 'location': 3},
    '/': {'keyCode': 111, 'key': '/', 'code': 'NumpadDivide', 'location': 3},
    ';': {'keyCode': 186, 'key': ';', 'code': 'Semicolon'},
    '=': {'keyCode': 187, 'key': '=', 'code': 'Equal'},
    ',': {'keyCode': 188, 'key': ',', 'code': 'Comma'},
    '.': {'keyCode': 190, 'key': '.', 'code': 'Period'},
    '`': {'keyCode': 192, 'key': '`', 'code': 'Backquote'},
    '[': {'keyCode': 219, 'key': '[', 'code': 'BracketLeft'},
    '\\': {'keyCode': 220, 'key': '\\', 'code': 'Backslash'},
    ']': {'keyCode': 221, 'key': ']', 'code': 'BracketRight'},
    '\'': {'keyCode': 222, 'key': '\'', 'code': 'Quote'},
    'Attn': {'keyCode': 246, 'key': 'Attn'},
    'CrSel': {'keyCode': 247, 'key': 'CrSel', 'code': 'Props'},
    'ExSel': {'keyCode': 248, 'key': 'ExSel'},
    'EraseEof': {'keyCode': 249, 'key': 'EraseEof'},
    'Play': {'keyCode': 250, 'key': 'Play'},
    'ZoomOut': {'keyCode': 251, 'key': 'ZoomOut'},
    ')': {'keyCode': 48, 'key': ')', 'code': 'Digit0'},
    '!': {'keyCode': 49, 'key': '!', 'code': 'Digit1'},
    '@': {'keyCode': 50, 'key': '@', 'code': 'Digit2'},
    '#': {'keyCode': 51, 'key': '#', 'code': 'Digit3'},
    '$': {'keyCode': 52, 'key': '$', 'code': 'Digit4'},
    '%': {'keyCode': 53, 'key': '%', 'code': 'Digit5'},
    '^': {'keyCode': 54, 'key': '^', 'code': 'Digit6'},
    '&': {'keyCode': 55, 'key': '&', 'code': 'Digit7'},
    '(': {'keyCode': 57, 'key': '(', 'code': 'Digit9'},
    'A': {'keyCode': 65, 'key': 'A', 'code': 'KeyA'},
    'B': {'keyCode': 66, 'key': 'B', 'code': 'KeyB'},
    'C': {'keyCode': 67, 'key': 'C', 'code': 'KeyC'},
    'D': {'keyCode': 68, 'key': 'D', 'code': 'KeyD'},
    'E': {'keyCode': 69, 'key': 'E', 'code': 'KeyE'},
    'F': {'keyCode': 70, 'key': 'F', 'code': 'KeyF'},
    'G': {'keyCode': 71, 'key': 'G', 'code': 'KeyG'},
    'H': {'keyCode': 72, 'key': 'H', 'code': 'KeyH'},
    'I': {'keyCode': 73, 'key': 'I', 'code': 'KeyI'},
    'J': {'keyCode': 74, 'key': 'J', 'code': 'KeyJ'},
    'K': {'keyCode': 75, 'key': 'K', 'code': 'KeyK'},
    'L': {'keyCode': 76, 'key': 'L', 'code': 'KeyL'},
    'M': {'keyCode': 77, 'key': 'M', 'code': 'KeyM'},
    'N': {'keyCode': 78, 'key': 'N', 'code': 'KeyN'},
    'O': {'keyCode': 79, 'key': 'O', 'code': 'KeyO'},
    'P': {'keyCode': 80, 'key': 'P', 'code': 'KeyP'},
    'Q': {'keyCode': 81, 'key': 'Q', 'code': 'KeyQ'},
    'R': {'keyCode': 82, 'key': 'R', 'code': 'KeyR'},
    'S': {'keyCode': 83, 'key': 'S', 'code': 'KeyS'},
    'T': {'keyCode': 84, 'key': 'T', 'code': 'KeyT'},
    'U': {'keyCode': 85, 'key': 'U', 'code': 'KeyU'},
    'V': {'keyCode': 86, 'key': 'V', 'code': 'KeyV'},
    'W': {'keyCode': 87, 'key': 'W', 'code': 'KeyW'},
    'X': {'keyCode': 88, 'key': 'X', 'code': 'KeyX'},
    'Y': {'keyCode': 89, 'key': 'Y', 'code': 'KeyY'},
    'Z': {'keyCode': 90, 'key': 'Z', 'code': 'KeyZ'},
    ':': {'keyCode': 186, 'key': ':', 'code': 'Semicolon'},
    '<': {'keyCode': 188, 'key': '<', 'code': 'Comma'},
    '_': {'keyCode': 189, 'key': '_', 'code': 'Minus'},
    '>': {'keyCode': 190, 'key': '>', 'code': 'Period'},
    '?': {'keyCode': 191, 'key': '?', 'code': 'Slash'},
    '~': {'keyCode': 192, 'key': '~', 'code': 'Backquote'},
    '{': {'keyCode': 219, 'key': '{', 'code': 'BracketLeft'},
    '|': {'keyCode': 220, 'key': '|', 'code': 'Backslash'},
    '}': {'keyCode': 221, 'key': '}', 'code': 'BracketRight'},
    '"': {'keyCode': 222, 'key': '"', 'code': 'Quote'},
}

def merge_dict(dict1: Optional[Dict], dict2: Optional[Dict]) -> Dict:
    """Merge two dictionaries into new one."""
    new_dict = {}
    if dict1:
        new_dict.update(dict1)
    if dict2:
        new_dict.update(dict2)
    return new_dict
boludoz commented 1 month ago

There is still some work to be done.

boludoz commented 1 month ago

modify core\tab.py:

from __future__ import annotations
import asyncio
import json
import logging
import pathlib
import typing
import warnings
from typing import List, Union, Optional, Tuple

import nodriver.core.browser
from . import element
from . import util
from .util import merge_dict, key_definitions
from .config import PathLike
from .connection import Connection, ProtocolException
from .. import cdp

from typing import Any, Dict, TYPE_CHECKING
from typing import Dict, Optional
if TYPE_CHECKING:
    from typing import Set

logger = logging.getLogger(__name__)

class Tab(Connection):
    """
    :ref:`tab` is the controlling mechanism/connection to a 'target',
    for most of us 'target' can be read as 'tab'. however it could also
    be an iframe, serviceworker or background script for example,
    although there isn't much to control for those.

    if you open a new window by using :py:meth:`browser.get(..., new_window=True)`
    your url will open a new window. this window is a 'tab'.
    When you browse to another page, the tab will be the same (it is an browser view).

    So it's important to keep some reference to tab objects, in case you're
    done interacting with elements and want to operate on the page level again.

    Custom CDP commands
    ---------------------------
    Tab object provide many useful and often-used methods. It is also
    possible to utilize the included cdp classes to to something totally custom.

    the cdp package is a set of so-called "domains" with each having methods, events and types.
    to send a cdp method, for example :py:obj:`cdp.page.navigate`, you'll have to check
    whether the method accepts any parameters and whether they are required or not.

    you can use

    ```python
    await tab.send(cdp.page.navigate(url='https://yoururlhere'))
so tab.send() accepts a generator object, which is created by calling a cdp method.
this way you can build very detailed and customized commands.
(note: finding correct command combo's can be a time consuming task, luckily i added a whole bunch
of useful methods, preferably having the same api's or lookalikes, as in selenium)

some useful, often needed and simply required methods
===================================================================

:py:meth:`~find`  |  find(text)
----------------------------------------
find and returns a single element by text match. by default returns the first element found.
much more powerful is the best_match flag, although also much more expensive.
when no match is found, it will retry for <timeout> seconds (default: 10), so
this is also suitable to use as wait condition.

:py:meth:`~find` |  find(text, best_match=True) or find(text, True)
---------------------------------------------------------------------------------
Much more powerful (and expensive!!) than the above, is the use of the `find(text, best_match=True)` flag.
It will still return 1 element, but when multiple matches are found, picks the one having the
most similar text length.
How would that help?
For example, you search for "login", you'd probably want the "login" button element,
and not thousands of scripts,meta,headings which happens to contain a string of "login".

when no match is found, it will retry for <timeout> seconds (default: 10), so
this is also suitable to use as wait condition.

:py:meth:`~select` | select(selector)
----------------------------------------
find and returns a single element by css selector match.
when no match is found, it will retry for <timeout> seconds (default: 10), so
this is also suitable to use as wait condition.

:py:meth:`~select_all` | select_all(selector)
------------------------------------------------
find and returns all elements by css selector match.
when no match is found, it will retry for <timeout> seconds (default: 10), so
this is also suitable to use as wait condition.

await :py:obj:`Tab`
---------------------------
calling `await tab` will do a lot of stuff under the hood, and ensures all references
are up to date. also it allows for the script to "breathe", as it is oftentime faster than your browser or
webpage. So whenever you get stuck and things crashes or element could not be found, you should probably let
it "breathe"  by calling `await page`  and/or `await page.sleep()`

also, it's ensuring :py:obj:`~url` will be updated to the most recent one, which is quite important in some
other methods.

Using other and custom CDP commands
======================================================
using the included cdp module, you can easily craft commands, which will always return an generator object.
this generator object can be easily sent to the :py:meth:`~send`  method.

:py:meth:`~send`
---------------------------
this is probably THE most important method, although you won't ever call it, unless you want to
go really custom. the send method accepts a :py:obj:`cdp` command. Each of which can be found in the
cdp section.

when you import * from this package, cdp will be in your namespace, and contains all domains/actions/events
you can act upon.
"""

browser: nodriver.core.browser.Browser
_download_behavior: List[str] = None

def __init__(
    self,
    websocket_url: str,
    target: cdp.target.TargetInfo,
    browser: Optional["nodriver.Browser"] = None,
    **kwargs,
):
    super().__init__(websocket_url, target, browser, **kwargs)
    self.browser = browser
    self._dom = None
    self._window_id = None
    self.keyboard = self.Keyboard(self)
    self.touchscreen = self.Touchscreen(self)
    self.mouse = self.Mouse(self)

@property
def inspector_url(self):
    """
    get the inspector url. this url can be used in another browser to show you the devtools interface for
    current tab. useful for debugging (and headless)
    :return:
    :rtype:
    """
    return f"http://{self.browser.config.host}:{self.browser.config.port}/devtools/inspector.html?ws={self.websocket_url[5:]}"

def inspector_open(self):
    import webbrowser

    webbrowser.open(self.inspector_url, new=2)

async def open_external_inspector(self):
    """
    opens the system's browser containing the devtools inspector page
    for this tab. could be handy, especially to debug in headless mode.
    """
    import webbrowser

    webbrowser.open(self.inspector_url)

async def find(
    self,
    text: str,
    best_match: bool = True,
    return_enclosing_element=True,
    timeout: Union[int, float] = 10,
):
    """
    find single element by text
    can also be used to wait for such element to appear.

    :param text: text to search for. note: script contents are also considered text
    :type text: str
    :param best_match:  :param best_match:  when True (default), it will return the element which has the most
                                           comparable string length. this could help tremendously, when for example
                                           you search for "login", you'd probably want the login button element,
                                           and not thousands of scripts,meta,headings containing a string of "login".
                                           When False, it will return naively just the first match (but is way faster).
     :type best_match: bool
     :param return_enclosing_element:
             since we deal with nodes instead of elements, the find function most often returns
             so called text nodes, which is actually a element of plain text, which is
             the somehow imaginary "child" of a "span", "p", "script" or any other elements which have text between their opening
             and closing tags.
             most often when we search by text, we actually aim for the element containing the text instead of
             a lousy plain text node, so by default the containing element is returned.

             however, there are (why not) exceptions, for example elements that use the "placeholder=" property.
             this text is rendered, but is not a pure text node. in that case you can set this flag to False.
             since in this case we are probably interested in just that element, and not it's parent.

             # todo, automatically determine node type
             # ignore the return_enclosing_element flag if the found node is NOT a text node but a
             # regular element (one having a tag) in which case that is exactly what we need.
     :type return_enclosing_element: bool
    :param timeout: raise timeout exception when after this many seconds nothing is found.
    :type timeout: float,int
    """
    loop = asyncio.get_running_loop()
    start_time = loop.time()

    text = text.strip()

    item = await self.find_element_by_text(
        text, best_match, return_enclosing_element
    )
    while not item:
        await self
        item = await self.find_element_by_text(
            text, best_match, return_enclosing_element
        )
        if loop.time() - start_time > timeout:
            raise asyncio.TimeoutError(
                "time ran out while waiting for text: %s" % text
            )
        await self.sleep(0.5)
    return item

async def select(
    self,
    selector: str,
    timeout: Union[int, float] = 10,
) -> nodriver.Element:
    """
    find single element by css selector.
    can also be used to wait for such element to appear.

    :param selector: css selector, eg a[href], button[class*=close], a > img[src]
    :type selector: str

    :param timeout: raise timeout exception when after this many seconds nothing is found.
    :type timeout: float,int

    """
    loop = asyncio.get_running_loop()
    start_time = loop.time()

    selector = selector.strip()
    item = await self.query_selector(selector)

    while not item:
        await self
        item = await self.query_selector(selector)
        if loop.time() - start_time > timeout:
            raise asyncio.TimeoutError(
                "time ran out while waiting for %s" % selector
            )
        await self.sleep(0.5)
    return item

async def find_all(
    self,
    text: str,
    timeout: Union[int, float] = 10,
) -> List[nodriver.Element]:
    """
    find multiple elements by text
    can also be used to wait for such element to appear.

    :param text: text to search for. note: script contents are also considered text
    :type text: str

    :param timeout: raise timeout exception when after this many seconds nothing is found.
    :type timeout: float,int
    """
    loop = asyncio.get_running_loop()
    now = loop.time()

    text = text.strip()
    items = await self.find_elements_by_text(text)

    while not items:
        await self
        items = await self.find_elements_by_text(text)
        if loop.time() - now > timeout:
            raise asyncio.TimeoutError(
                "time ran out while waiting for text: %s" % text
            )
        await self.sleep(0.5)
    return items

async def select_all(
    self, selector: str, timeout: Union[int, float] = 10, include_frames=False
) -> List[nodriver.Element]:
    """
    find multiple elements by css selector.
    can also be used to wait for such element to appear.

    :param selector: css selector, eg a[href], button[class*=close], a > img[src]
    :type selector: str
    :param timeout: raise timeout exception when after this many seconds nothing is found.
    :type timeout: float,int
    :param include_frames: whether to include results in iframes.
    :type include_frames: bool
    """

    loop = asyncio.get_running_loop()
    now = loop.time()
    selector = selector.strip()
    items = []
    if include_frames:
        frames = await self.query_selector_all("iframe")
        # unfortunately, asyncio.gather here is not an option
        for fr in frames:
            items.extend(await fr.query_selector_all(selector))

    items.extend(await self.query_selector_all(selector))
    while not items:
        await self
        items = await self.query_selector_all(selector)
        if loop.time() - now > timeout:
            raise asyncio.TimeoutError(
                "time ran out while waiting for %s" % selector
            )
        await self.sleep(0.5)
    return items

async def get(
    self, url="chrome://welcome", new_tab: bool = False, new_window: bool = False
):
    """top level get. utilizes the first tab to retrieve given url.

    convenience function known from selenium.
    this function handles waits/sleeps and detects when DOM events fired, so it's the safest
    way of navigating.

    :param url: the url to navigate to
    :param new_tab: open new tab
    :param new_window:  open new window
    :return: Page
    """
    if not self.browser:
        raise AttributeError(
            "this page/tab has no browser attribute, so you can't use get()"
        )
    if new_window and not new_tab:
        new_tab = True

    if new_tab:
        return await self.browser.get(url, new_tab, new_window)
    else:
        frame_id, loader_id, *_ = await self.send(cdp.page.navigate(url))
        await self
        return self

async def query_selector_all(
    self,
    selector: str,
    _node: Optional[Union[cdp.dom.Node, "element.Element"]] = None,
):
    """
    equivalent of javascripts document.querySelectorAll.
    this is considered one of the main methods to use in this package.

    it returns all matching :py:obj:`nodriver.Element` objects.

    :param selector: css selector. (first time? => https://www.w3schools.com/cssref/css_selectors.php )
    :type selector: str
    :param _node: internal use
    :type _node:
    :return:
    :rtype:
    """

    if not _node:
        doc: cdp.dom.Node = await self.send(cdp.dom.get_document(-1, True))
    else:
        doc = _node
        if _node.node_name == "IFRAME":
            doc = _node.content_document
    node_ids = []

    try:
        node_ids = await self.send(
            cdp.dom.query_selector_all(doc.node_id, selector)
        )

    except ProtocolException as e:
        if _node is not None:
            if "could not find node" in e.message.lower():
                if getattr(_node, "__last", None):
                    del _node.__last
                    return []
                # if supplied node is not found, the dom has changed since acquiring the element
                # therefore we need to update our passed node and try again
                await _node.update()
                _node.__last = (
                    True  # make sure this isn't turned into infinite loop
                )
                return await self.query_selector_all(selector, _node)
        else:
            await self.send(cdp.dom.disable())
            raise
    if not node_ids:
        return []
    items = []

    for nid in node_ids:
        node = util.filter_recurse(doc, lambda n: n.node_id == nid)
        # we pass along the retrieved document tree,
        # to improve performance
        if not node:
            continue
        elem = element.create(node, self, doc)
        items.append(elem)

    return items

async def query_selector(
    self,
    selector: str,
    _node: Optional[Union[cdp.dom.Node, element.Element]] = None,
):
    """
    find single element based on css selector string

    :param selector: css selector(s)
    :type selector: str
    :return:
    :rtype:
    """
    selector = selector.strip()

    if not _node:
        doc: cdp.dom.Node = await self.send(cdp.dom.get_document(-1, True))
    else:
        doc = _node
        if _node.node_name == "IFRAME":
            doc = _node.content_document
    node_id = None

    try:
        node_id = await self.send(cdp.dom.query_selector(doc.node_id, selector))

    except ProtocolException as e:
        if _node is not None:
            if "could not find node" in e.message.lower():
                if getattr(_node, "__last", None):
                    del _node.__last
                    return []
                # if supplied node is not found, the dom has changed since acquiring the element
                # therefore we need to update our passed node and try again
                await _node.update()
                _node.__last = (
                    True  # make sure this isn't turned into infinite loop
                )
                return await self.query_selector(selector, _node)
        else:
            await self.send(cdp.dom.disable())
            raise
    if not node_id:
        return
    node = util.filter_recurse(doc, lambda n: n.node_id == node_id)
    if not node:
        return
    return element.create(node, self, doc)

async def find_elements_by_text(
    self,
    text: str,
    tag_hint: Optional[str] = None,
) -> List[element.Element]:
    """
    returns element which match the given text.
    please note: this may (or will) also return any other element (like inline scripts),
    which happen to contain that text.

    :param text:
    :type text:
    :param tag_hint: when provided, narrows down search to only elements which match given tag eg: a, div, script, span
    :type tag_hint: str
    :return:
    :rtype:
    """
    text = text.strip()
    doc = await self.send(cdp.dom.get_document(-1, True))
    search_id, nresult = await self.send(cdp.dom.perform_search(text, True))
    if nresult:
        node_ids = await self.send(
            cdp.dom.get_search_results(search_id, 0, nresult)
        )
    else:
        node_ids = []

    await self.send(cdp.dom.discard_search_results(search_id))

    items = []
    for nid in node_ids:
        node = util.filter_recurse(doc, lambda n: n.node_id == nid)
        if not node:
            node = await self.send(cdp.dom.resolve_node(node_id=nid))
            if not node:
                continue
            # remote_object = await self.send(cdp.dom.resolve_node(backend_node_id=node.backend_node_id))
            # node_id = await self.send(cdp.dom.request_node(object_id=remote_object.object_id))
        try:
            elem = element.create(node, self, doc)
        except:  # noqa
            continue
        if elem.node_type == 3:
            # if found element is a text node (which is plain text, and useless for our purpose),
            # we return the parent element of the node (which is often a tag which can have text between their
            # opening and closing tags (that is most tags, except for example "img" and "video", "br")

            if not elem.parent:
                # check if parent actually has a parent and update it to be absolutely sure
                await elem.update()

            items.append(
                elem.parent or elem
            )  # when it really has no parent, use the text node itself
            continue
        else:
            # just add the element itself
            items.append(elem)

    # since we already fetched the entire doc, including shadow and frames
    # let's also search through the iframes
    iframes = util.filter_recurse_all(doc, lambda node: node.node_name == "IFRAME")
    if iframes:
        iframes_elems = [
            element.create(iframe, self, iframe.content_document)
            for iframe in iframes
        ]
        for iframe_elem in iframes_elems:
            if iframe_elem.content_document:
                iframe_text_nodes = util.filter_recurse_all(
                    iframe_elem,
                    lambda node: node.node_type == 3  # noqa
                    and text.lower() in node.node_value.lower(),
                )
                if iframe_text_nodes:
                    iframe_text_elems = [
                        element.create(text_node, self, iframe_elem.tree)
                        for text_node in iframe_text_nodes
                    ]
                    items.extend(
                        text_node.parent for text_node in iframe_text_elems
                    )
    await self.send(cdp.dom.disable())
    return items or []

async def find_element_by_text(
    self,
    text: str,
    best_match: Optional[bool] = False,
    return_enclosing_element: Optional[bool] = True,
) -> Union[element.Element, None]:
    """
    finds and returns the first element containing <text>, or best match

    :param text:
    :type text:
    :param best_match:  when True, which is MUCH more expensive (thus much slower),
                        will find the closest match based on length.
                        this could help tremendously, when for example you search for "login", you'd probably want the login button element,
                        and not thousands of scripts,meta,headings containing a string of "login".

    :type best_match: bool
    :param return_enclosing_element:
    :type return_enclosing_element:
    :return:
    :rtype:
    """
    doc = await self.send(cdp.dom.get_document(-1, True))
    text = text.strip()
    search_id, nresult = await self.send(cdp.dom.perform_search(text, True))

    node_ids = await self.send(cdp.dom.get_search_results(search_id, 0, nresult))
    await self.send(cdp.dom.discard_search_results(search_id))

    if not node_ids:
        node_ids = []
    items = []
    for nid in node_ids:
        node = util.filter_recurse(doc, lambda n: n.node_id == nid)
        try:
            elem = element.create(node, self, doc)
        except:  # noqa
            continue
        if elem.node_type == 3:
            # if found element is a text node (which is plain text, and useless for our purpose),
            # we return the parent element of the node (which is often a tag which can have text between their
            # opening and closing tags (that is most tags, except for example "img" and "video", "br")

            if not elem.parent:
                # check if parent actually has a parent and update it to be absolutely sure
                await elem.update()

            items.append(
                elem.parent or elem
            )  # when it really has no parent, use the text node itself
            continue
        else:
            # just add the element itself
            items.append(elem)

    # since we already fetched the entire doc, including shadow and frames
    # let's also search through the iframes
    iframes = util.filter_recurse_all(doc, lambda node: node.node_name == "IFRAME")
    if iframes:
        iframes_elems = [
            element.create(iframe, self, iframe.content_document)
            for iframe in iframes
        ]
        for iframe_elem in iframes_elems:
            iframe_text_nodes = util.filter_recurse_all(
                iframe_elem,
                lambda node: node.node_type == 3  # noqa
                and text.lower() in node.node_value.lower(),
            )
            if iframe_text_nodes:
                iframe_text_elems = [
                    element.create(text_node, self, iframe_elem.tree)
                    for text_node in iframe_text_nodes
                ]
                items.extend(text_node.parent for text_node in iframe_text_elems)
    try:
        if not items:
            return
        if best_match:
            closest_by_length = min(
                items, key=lambda el: abs(len(text) - len(el.text_all))
            )
            elem = closest_by_length or items[0]

            return elem
        else:
            # naively just return the first result
            for elem in items:
                if elem:
                    return elem
    finally:
        await self.send(cdp.dom.disable())

async def back(self):
    """
    history back
    """
    await self.send(cdp.runtime.evaluate("window.history.back()"))

async def forward(self):
    """
    history forward
    """
    await self.send(cdp.runtime.evaluate("window.history.forward()"))

async def reload(
    self,
    ignore_cache: Optional[bool] = True,
    script_to_evaluate_on_load: Optional[str] = None,
):
    """
    Reloads the page

    :param ignore_cache: when set to True (default), it ignores cache, and re-downloads the items
    :type ignore_cache:
    :param script_to_evaluate_on_load: script to run on load. I actually haven't experimented with this one, so no guarantees.
    :type script_to_evaluate_on_load:
    :return:
    :rtype:
    """
    await self.send(
        cdp.page.reload(
            ignore_cache=ignore_cache,
            script_to_evaluate_on_load=script_to_evaluate_on_load,
        ),
    )

async def evaluate(
    self, expression: str, await_promise=False, return_by_value=True
):
    remote_object, errors = await self.send(
        cdp.runtime.evaluate(
            expression=expression,
            user_gesture=True,
            await_promise=await_promise,
            return_by_value=return_by_value,
            allow_unsafe_eval_blocked_by_csp=True,
        )
    )
    if errors:
        raise ProtocolException(errors)

    if remote_object:
        if return_by_value:
            if remote_object.value:
                return remote_object.value

        else:
            return remote_object, errors

async def js_dumps(
    self, obj_name: str, return_by_value: Optional[bool] = True
) -> typing.Union[
    typing.Dict,
    typing.Tuple[cdp.runtime.RemoteObject, cdp.runtime.ExceptionDetails],
]:
    """
    dump given js object with its properties and values as a dict

    note: complex objects might not be serializable, therefore this method is not a "source of thruth"

    :param obj_name: the js object to dump
    :type obj_name: str

    :param return_by_value: if you want an tuple of cdp objects (returnvalue, errors), set this to False
    :type return_by_value: bool

    example
    ------

    x = await self.js_dumps('window')
    print(x)
        '...{
        'pageYOffset': 0,
        'visualViewport': {},
        'screenX': 10,
        'screenY': 10,
        'outerWidth': 1050,
        'outerHeight': 832,
        'devicePixelRatio': 1,
        'screenLeft': 10,
        'screenTop': 10,
        'styleMedia': {},
        'onsearch': None,
        'isSecureContext': True,
        'trustedTypes': {},
        'performance': {'timeOrigin': 1707823094767.9,
        'timing': {'connectStart': 0,
        'navigationStart': 1707823094768,
        ]...
        '
    """
    js_code_a = (
        """
                       function ___dump(obj, _d = 0) {
                           let _typesA = ['object', 'function'];
                           let _typesB = ['number', 'string', 'boolean'];
                           if (_d == 2) {
                               console.log('maxdepth reached for ', obj);
                               return
                           }
                           let tmp = {}
                           for (let k in obj) {
                               if (obj[k] == window) continue;
                               let v;
                               try {
                                   if (obj[k] === null || obj[k] === undefined || obj[k] === NaN) {
                                       console.log('obj[k] is null or undefined or Nan', k, '=>', obj[k])
                                       tmp[k] = obj[k];
                                       continue
                                   }
                               } catch (e) {
                                   tmp[k] = null;
                                   continue
                               }

                               if (_typesB.includes(typeof obj[k])) {
                                   tmp[k] = obj[k]
                                   continue
                               }

                               try {
                                   if (typeof obj[k] === 'function') {
                                       tmp[k] = obj[k].toString()
                                       continue
                                   }

                                   if (typeof obj[k] === 'object') {
                                       tmp[k] = ___dump(obj[k], _d + 1);
                                       continue
                                   }

                               } catch (e) {}

                               try {
                                   tmp[k] = JSON.stringify(obj[k])
                                   continue
                               } catch (e) {

                               }
                               try {
                                   tmp[k] = obj[k].toString();
                                   continue
                               } catch (e) {}
                           }
                           return tmp
                       }

                       function ___dumpY(obj) {
                           var objKeys = (obj) => {
                               var [target, result] = [obj, []];
                               while (target !== null) {
                                   result = result.concat(Object.getOwnPropertyNames(target));
                                   target = Object.getPrototypeOf(target);
                               }
                               return result;
                           }
                           return Object.fromEntries(
                               objKeys(obj).map(_ => [_, ___dump(obj[_])]))

                       }
                       ___dumpY( %s )
               """
        % obj_name
    )
    js_code_b = (
        """
        ((obj, visited = new WeakSet()) => {
             if (visited.has(obj)) {
                 return {}
             }
             visited.add(obj)
             var result = {}, _tmp;
             for (var i in obj) {
                     try {
                         if (i === 'enabledPlugin' || typeof obj[i] === 'function') {
                             continue;
                         } else if (typeof obj[i] === 'object') {
                             _tmp = recurse(obj[i], visited);
                             if (Object.keys(_tmp).length) {
                                 result[i] = _tmp;
                             }
                         } else {
                             result[i] = obj[i];
                         }
                     } catch (error) {
                         // console.error('Error:', error);
                     }
                 }
            return result;
        })(%s)
    """
        % obj_name
    )

    # we're purposely not calling self.evaluate here to prevent infinite loop on certain expressions

    remote_object, exception_details = await self.send(
        cdp.runtime.evaluate(
            js_code_a,
            await_promise=True,
            return_by_value=return_by_value,
            allow_unsafe_eval_blocked_by_csp=True,
        )
    )
    if exception_details:

        # try second variant

        remote_object, exception_details = await self.send(
            cdp.runtime.evaluate(
                js_code_b,
                await_promise=True,
                return_by_value=return_by_value,
                allow_unsafe_eval_blocked_by_csp=True,
            )
        )

    if exception_details:
        raise ProtocolException(exception_details)
    if return_by_value:
        if remote_object.value:
            return remote_object.value
    else:
        return remote_object, exception_details

async def close(self):
    """
    close the current target (ie: tab,window,page)
    :return:
    :rtype:
    """
    if self.target and self.target.target_id:
        await self.send(cdp.target.close_target(target_id=self.target.target_id))

async def get_window(self) -> Tuple[cdp.browser.WindowID, cdp.browser.Bounds]:
    """
    get the window Bounds
    :return:
    :rtype:
    """
    window_id, bounds = await self.send(
        cdp.browser.get_window_for_target(self.target_id)
    )
    return window_id, bounds

async def get_content(self):
    """
    gets the current page source content (html)
    :return:
    :rtype:
    """
    doc: cdp.dom.Node = await self.send(cdp.dom.get_document(-1, True))
    return await self.send(
        cdp.dom.get_outer_html(backend_node_id=doc.backend_node_id)
    )

async def maximize(self):
    """
    maximize page/tab/window
    """
    return await self.set_window_state(state="maximize")

async def minimize(self):
    """
    minimize page/tab/window
    """
    return await self.set_window_state(state="minimize")

async def fullscreen(self):
    """
    minimize page/tab/window
    """
    return await self.set_window_state(state="fullscreen")

async def medimize(self):
    return await self.set_window_state(state="normal")

async def set_window_size(self, left=0, top=0, width=1280, height=1024):
    """
    set window size and position

    :param left: pixels from the left of the screen to the window top-left corner
    :type left:
    :param top: pixels from the top of the screen to the window top-left corner
    :type top:
    :param width: width of the window in pixels
    :type width:
    :param height: height of the window in pixels
    :type height:
    :return:
    :rtype:
    """
    return await self.set_window_state(left, top, width, height)

async def activate(self):
    """
    active this target (ie: tab,window,page)
    """
    await self.send(cdp.target.activate_target(self.target.target_id))

async def bring_to_front(self):
    """
    alias to self.activate
    """
    await self.activate()

async def set_window_state(
    self, left=0, top=0, width=1280, height=720, state="normal"
):
    """
    sets the window size or state.

    for state you can provide the full name like minimized, maximized, normal, fullscreen, or
    something which leads to either of those, like min, mini, mi,  max, ma, maxi, full, fu, no, nor
    in case state is set other than "normal", the left, top, width, and height are ignored.

    :param left:
        desired offset from left, in pixels
    :type left: int

    :param top:
        desired offset from the top, in pixels
    :type top: int

    :param width:
        desired width in pixels
    :type width: int

    :param height:
        desired height in pixels
    :type height: int

    :param state:
        can be one of the following strings:
            - normal
            - fullscreen
            - maximized
            - minimized

    :type state: str

    """
    available_states = ["minimized", "maximized", "fullscreen", "normal"]
    window_id: cdp.browser.WindowID
    bounds: cdp.browser.Bounds
    (window_id, bounds) = await self.get_window()

    for state_name in available_states:
        if all(x in state_name for x in state.lower()):
            break
    else:
        raise NameError(
            "could not determine any of %s from input '%s'"
            % (",".join(available_states), state)
        )
    window_state = getattr(
        cdp.browser.WindowState, state_name.upper(), cdp.browser.WindowState.NORMAL
    )
    if window_state == cdp.browser.WindowState.NORMAL:
        bounds = cdp.browser.Bounds(left, top, width, height, window_state)
    else:
        # min, max, full can only be used when current state == NORMAL
        # therefore we first switch to NORMAL
        await self.set_window_state(state="normal")
        bounds = cdp.browser.Bounds(window_state=window_state)

    await self.send(cdp.browser.set_window_bounds(window_id, bounds=bounds))

class Keyboard:
    def __init__(self, tab: 'Tab') -> None:
        self.tab = tab
        self._modifiers = 0
        self._pressed_keys: Set[str] = set()

    async def down(self, key: str, options: dict = None, **kwargs: Any) -> None:
        """Dispatch a ``keydown`` event with ``key``."""
        options = merge_dict(options, kwargs)

        description = self._key_description_for_string(key)
        autoRepeat = description['code'] in self._pressed_keys
        self._pressed_keys.add(description['code'])
        self._modifiers |= self._modifier_bit(description['key'])

        text = options.get('text')
        if text is None:
            text = description['text']

        await self.tab.send(cdp.input_.dispatch_key_event(
            type_ = "keyDown" if text else "rawKeyDown",
            modifiers = self._modifiers,
            windows_virtual_key_code = description['keyCode'],
            code = description['code'],
            key = description['key'],
            text = text,
            unmodified_text = text,
            auto_repeat = autoRepeat,
            location = description['location'],
            is_keypad = description['location'] == 3,        
        ))

    def _modifier_bit(self, key: str) -> int:
        if key == 'Alt':
            return 1
        if key == 'Control':
            return 2
        if key == 'Meta':
            return 4
        if key == 'Shift':
            return 8
        return 0

    def _key_description_for_string(self, keyString: str) -> Dict:  # noqa: C901
        shift = self._modifiers & 8
        description = {
            'key': '',
            'keyCode': 0,
            'code': '',
            'text': '',
            'location': 0,
        }

        definition: Dict = key_definitions.get(keyString)  # type: ignore
        if not definition:
            raise Exception(f'Unknown key: {keyString}')

        if 'key' in definition:
            description['key'] = definition['key']
        if shift and definition.get('shiftKey'):
            description['key'] = definition['shiftKey']

        if 'keyCode' in definition:
            description['keyCode'] = definition['keyCode']
        if shift and definition.get('shiftKeyCode'):
            description['keyCode'] = definition['shiftKeyCode']

        if 'code' in definition:
            description['code'] = definition['code']

        if 'location' in definition:
            description['location'] = definition['location']

        if len(description['key']) == 1:  # type: ignore
            description['text'] = description['key']

        if 'text' in definition:
            description['text'] = definition['text']
        if shift and definition.get('shiftText'):
            description['text'] = definition['shiftText']

        if self._modifiers & ~8:
            description['text'] = ''

        return description

    async def up(self, key: str) -> None:
        """Dispatch a ``keyup`` event of the ``key``."""
        description = self._key_description_for_string(key)

        self._modifiers &= ~self._modifier_bit(description['key'])
        if description['code'] in self._pressed_keys:
            self._pressed_keys.remove(description['code'])
        await self.tab.send(cdp.input_.dispatch_key_event(
            type_ = "keyUp",
            modifiers = self._modifiers,
            key = description['key'],
            windows_virtual_key_code = description['keyCode'],
            code = description['code'],
            location = description['location'],       
        ))

    async def send_character(self, char: str) -> None:
        """Send character into the page."""
        await self.tab.send(cdp.input_.insert_text(char))

    async def type(self, text: str, options: Dict = None, **kwargs: Any) -> None:
        """Type characters into a focused element."""
        options = merge_dict(options, kwargs)
        delay = options.get('delay', 0)
        for char in text:
            if char in key_definitions:
                await self.press(char, {'delay': delay})
            else:
                await self.send_character(char)
            if delay:
                await asyncio.sleep(delay / 1000)

    async def press(self, key: str, options: Dict = None, **kwargs: Any) -> None:
        """Press ``key``."""
        options = merge_dict(options, kwargs)

        await self.down(key, options)
        if 'delay' in options:
            await asyncio.sleep(options['delay'] / 1000)
        await self.up(key)

class Mouse:
    """Mouse class.

    The :class:`Mouse` operates in main-frame CSS pixels relative to the
    top-left corner of the viewport.
    """

    def __init__(self, tab: 'Tab') -> None:
        self.tab = tab
        self._x = 0.0
        self._y = 0.0
        self._button = 'none'

    async def move(self, x: float, y: float, options: dict = None,
                **kwargs: Any) -> None:
        """Move mouse cursor (dispatches a ``mousemove`` event).

        Options can accepts ``steps`` (int) field. If this ``steps`` option
        specified, Sends intermediate ``mousemove`` events. Defaults to 1.
        """
        options = merge_dict(options, kwargs)
        from_x = self._x
        from_y = self._y
        self._x = x
        self._y = y
        steps = options.get('steps', 1)
        for i in range(1, steps + 1):
            x = round(from_x + (self._x - from_x) * (i / steps))
            y = round(from_y + (self._y - from_y) * (i / steps))
            await self.tab.send(cdp.input_.dispatch_mouse_event(
                type_ = 'mouseMoved',
                x = x,
                y = y,
                modifiers = self.tab.keyboard._modifiers,
                button = self._button,
            ))

    async def click(self, x: float, y: float, options: dict = None,
                    **kwargs: Any) -> None:
        """Click button at (``x``, ``y``).

        Shortcut to :meth:`move`, :meth:`down`, and :meth:`up`.

        This method accepts the following options:

        * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
        ``left``.
        * ``clickCount`` (int): defaults to 1.
        * ``delay`` (int|float): Time to wait between ``mousedown`` and
        ``mouseup`` in milliseconds. Defaults to 0.
        """
        options = merge_dict(options, kwargs)
        await self.move(x, y)
        await self.down(options)
        if options and options.get('delay'):
            await asyncio.sleep(options.get('delay', 0) / 1000)
        await self.up(options)

    async def down(self, options: dict = None, **kwargs: Any) -> None:
        """Press down button (dispatches ``mousedown`` event).

        This method accepts the following options:

        * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
        ``left``.
        * ``clickCount`` (int): defaults to 1.
        """
        options = merge_dict(options, kwargs)
        self._button = options.get('button', 'left')
        await self.tab.send(cdp.input_.dispatch_mouse_event(
            type_ = 'mousePressed',
            button = self._button,
            x = self._x,
            y = self._y,
            modifiers = self.tab.keyboard._modifiers,
            click_count = options.get('clickCount') or 1,
        ))

    async def up(self, options: dict = None, **kwargs: Any) -> None:
        """Release pressed button (dispatches ``mouseup`` event).

        This method accepts the following options:

        * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
        ``left``.
        * ``clickCount`` (int): defaults to 1.
        """
        options = merge_dict(options, kwargs)
        self._button = 'none'
        await self.tab.send(cdp.input_.dispatch_mouse_event(
            type_ = 'mouseReleased',
            button = options.get('button', 'left'),
            x = self._x,
            y = self._y,
            modifiers = self.tab.keyboard._modifiers,
            click_count = options.get('clickCount') or 1,
        ))

class Touchscreen:
    """Touchscreen class."""

    def __init__(self, tab: 'Tab') -> None:
        """Make new touchscreen object."""
        self.tab = tab

    async def tap(self, x: float, y: float) -> None:
        """Tap (``x``, ``y``).

        Dispatches a ``touchstart`` and ``touchend`` event.
        """
        touch_points = [{'x': round(x), 'y': round(y)}]
        await self.tab.send(cdp.input_.dispatch_touch_event(
            type_ = 'touchStart',
            touch_points = touch_points,
            modifiers = self.tab.keyboard._modifiers,
        ))
        await self.tab.send(cdp.input_.dispatch_touch_event(
            type_ = 'touchEnd',
            touch_points = [],
            modifiers = self.tab.keyboard._modifiers,
        ))

async def scroll_down(self, amount=25):
    """
    scrolls down maybe

    :param amount: number in percentage. 25 is a quarter of page, 50 half, and 1000 is 10x the page
    :type amount: int
    :return:
    :rtype:
    """
    window_id: cdp.browser.WindowID
    bounds: cdp.browser.Bounds
    (window_id, bounds) = await self.get_window()

    await self.send(
        cdp.input_.synthesize_scroll_gesture(
            x=0,
            y=0,
            y_distance=-(bounds.height * (amount / 100)),
            y_overscroll=0,
            x_overscroll=0,
            prevent_fling=True,
            repeat_delay_ms=0,
            speed=7777,
        )
    )

async def scroll_up(self, amount=25):
    """
    scrolls up maybe

    :param amount: number in percentage. 25 is a quarter of page, 50 half, and 1000 is 10x the page
    :type amount: int

    :return:
    :rtype:
    """
    window_id: cdp.browser.WindowID
    bounds: cdp.browser.Bounds
    (window_id, bounds) = await self.get_window()

    await self.send(
        cdp.input_.synthesize_scroll_gesture(
            x=0,
            y=0,
            y_distance=(bounds.height * (amount / 100)),
            x_overscroll=0,
            prevent_fling=True,
            repeat_delay_ms=0,
            speed=7777,
        )
    )

async def wait_for(
    self,
    selector: Optional[str] = "",
    text: Optional[str] = "",
    timeout: Optional[Union[int, float]] = 10,
) -> element.Element:
    """
    variant on query_selector_all and find_elements_by_text
    this variant takes either selector or text, and will block until
    the requested element(s) are found.

    it will block for a maximum of <timeout> seconds, after which
    an TimeoutError will be raised

    :param selector: css selector
    :type selector:
    :param text: text
    :type text:
    :param timeout:
    :type timeout:
    :return:
    :rtype: Element
    :raises: asyncio.TimeoutError
    """
    loop = asyncio.get_running_loop()
    now = loop.time()
    if selector:
        item = await self.query_selector(selector)
        while not item:
            item = await self.query_selector(selector)
            if loop.time() - now > timeout:
                raise asyncio.TimeoutError(
                    "time ran out while waiting for %s" % selector
                )
            await self.sleep(0.5)
            # await self.sleep(0.5)
        return item
    if text:
        item = await self.find_element_by_text(text)
        while not item:
            item = await self.find_element_by_text(text)
            if loop.time() - now > timeout:
                raise asyncio.TimeoutError(
                    "time ran out while waiting for text: %s" % text
                )
            await self.sleep(0.5)
        return item

async def download_file(self, url: str, filename: Optional[PathLike] = None):
    """
    downloads file by given url.

    :param url: url of the file
    :param filename: the name for the file. if not specified the name is composed from the url file name
    """
    if not self._download_behavior:
        directory_path = pathlib.Path.cwd() / "downloads"
        directory_path.mkdir(exist_ok=True)
        await self.set_download_path(directory_path)

        warnings.warn(
            f"no download path set, so creating and using a default of"
            f"{directory_path}"
        )
    if not filename:
        filename = url.rsplit("/")[-1]
        filename = filename.split("?")[0]

    code = """
     (elem) => {
        async function _downloadFile(
          imageSrc,
          nameOfDownload,
        ) {
          const response = await fetch(imageSrc);
          const blobImage = await response.blob();
          const href = URL.createObjectURL(blobImage);

          const anchorElement = document.createElement('a');
          anchorElement.href = href;
          anchorElement.download = nameOfDownload;

          document.body.appendChild(anchorElement);
          anchorElement.click();

          setTimeout(() => {
            document.body.removeChild(anchorElement);
            window.URL.revokeObjectURL(href);
            }, 500);
        }
        _downloadFile('%s', '%s')
        }
        """ % (
        url,
        filename,
    )

    body = (await self.query_selector_all("body"))[0]
    await body.update()
    await self.send(
        cdp.runtime.call_function_on(
            code,
            object_id=body.object_id,
            arguments=[cdp.runtime.CallArgument(object_id=body.object_id)],
        )
    )

async def save_screenshot(
    self,
    filename: Optional[PathLike] = "auto",
    format: Optional[str] = "jpeg",
    full_page: Optional[bool] = False,
) -> str:
    """
    Saves a screenshot of the page.
    This is not the same as :py:obj:`Element.save_screenshot`, which saves a screenshot of a single element only

    :param filename: uses this as the save path
    :type filename: PathLike
    :param format: jpeg or png (defaults to jpeg)
    :type format: str
    :param full_page: when False (default) it captures the current viewport. when True, it captures the entire page
    :type full_page: bool
    :return: the path/filename of saved screenshot
    :rtype: str
    """
    # noqa
    import urllib.parse
    import datetime

    await self.sleep()  # update the target's url
    path = None

    if format.lower() in ["jpg", "jpeg"]:
        ext = ".jpg"
        format = "jpeg"

    elif format.lower() in ["png"]:
        ext = ".png"
        format = "png"

    if not filename or filename == "auto":
        parsed = urllib.parse.urlparse(self.target.url)
        parts = parsed.path.split("/")
        last_part = parts[-1]
        last_part = last_part.rsplit("?", 1)[0]
        dt_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        candidate = f"{parsed.hostname}__{last_part}_{dt_str}"
        path = pathlib.Path(candidate + ext)  # noqa
    else:
        path = pathlib.Path(filename)
    path.parent.mkdir(parents=True, exist_ok=True)
    data = await self.send(
        cdp.page.capture_screenshot(
            format_=format, capture_beyond_viewport=full_page
        )
    )
    if not data:
        raise ProtocolException(
            "could not take screenshot. most possible cause is the page has not finished loading yet."
        )
    import base64

    data_bytes = base64.b64decode(data)
    if not path:
        raise RuntimeError("invalid filename or path: '%s'" % filename)
    path.write_bytes(data_bytes)
    return str(path)

async def set_download_path(self, path: PathLike):
    """
    sets the download path and allows downloads
    this is required for any download function to work (well not entirely, since when unset we set a default folder)

    :param path:
    :type path:
    :return:
    :rtype:
    """
    await self.send(
        cdp.browser.set_download_behavior(
            behavior="allow", download_path=str(path.resolve())
        )
    )
    self._download_behavior = ["allow", str(path.resolve())]

async def get_all_linked_sources(self) -> List["nodriver.Element"]:
    """
    get all elements of tag: link, a, img, scripts meta, video, audio

    :return:
    """
    all_assets = await self.query_selector_all(selector="a,link,img,script,meta")
    return [element.create(asset, self) for asset in all_assets]

async def get_all_urls(self, absolute=True) -> List[str]:
    """
    convenience function, which returns all links (a,link,img,script,meta)

    :param absolute: try to build all the links in absolute form instead of "as is", often relative
    :return: list of urls
    """

    import urllib.parse

    res = []
    all_assets = await self.query_selector_all(selector="a,link,img,script,meta")
    for asset in all_assets:
        if not absolute:
            res.append(asset.src or asset.href)
        else:
            for k, v in asset.attrs.items():
                if k in ("src", "href"):
                    if "#" in v:
                        continue
                    if not any([_ in v for _ in ("http", "//", "/")]):
                        continue
                    abs_url = urllib.parse.urljoin(
                        "/".join(self.url.rsplit("/")[:3]), v
                    )
                    if not abs_url.startswith(("http", "//", "ws")):
                        continue
                    res.append(abs_url)
    return res

async def verify_cf(self):
    """an attempt.."""
    checkbox = None
    checkbox_sibling = await self.wait_for(text="verify you are human")
    if checkbox_sibling:
        parent = checkbox_sibling.parent
        while parent:
            checkbox = await parent.query_selector("input[type=checkbox]")
            if checkbox:
                break
            parent = parent.parent
    await checkbox.mouse_move()
    await checkbox.mouse_click()

async def get_local_storage(self):
    """
    get local storage items as dict of strings (careful!, proper deserialization needs to be done if needed)

    :return:
    :rtype:
    """
    if not self.target.url:
        await self

    # there must be a better way...
    origin = "/".join(self.url.split("/", 3)[:-1])

    items = await self.send(
        cdp.dom_storage.get_dom_storage_items(
            cdp.dom_storage.StorageId(is_local_storage=True, security_origin=origin)
        )
    )
    retval = {}
    for item in items:
        retval[item[0]] = item[1]
    return retval

async def set_local_storage(self, items: dict):
    """
    set local storage.
    dict items must be strings. simple types will be converted to strings automatically.

    :param items: dict containing {key:str, value:str}
    :type items: dict[str,str]
    :return:
    :rtype:
    """
    if not self.target.url:
        await self
    # there must be a better way...
    origin = "/".join(self.url.split("/", 3)[:-1])

    await asyncio.gather(
        *[
            self.send(
                cdp.dom_storage.set_dom_storage_item(
                    storage_id=cdp.dom_storage.StorageId(
                        is_local_storage=True, security_origin=origin
                    ),
                    key=str(key),
                    value=str(val),
                )
            )
            for key, val in items.items()
        ]
    )

def __call__(
    self,
    text: Optional[str] = "",
    selector: Optional[str] = "",
    timeout: Optional[Union[int, float]] = 10,
):
    """
    alias to query_selector_all or find_elements_by_text, depending
    on whether text= is set or selector= is set

    :param selector: css selector string
    :type selector: str
    :return:
    :rtype:
    """
    return self.wait_for(text, selector, timeout)

def __eq__(self, other: Tab):
    try:
        return other.target == self.target
    except (AttributeError, TypeError):
        return False

def __getattr__(self, item):
    try:
        return getattr(self._target, item)
    except AttributeError:
        raise AttributeError(
            f'"{self.__class__.__name__}" has no attribute "%s"' % item
        )

def __repr__(self):
    extra = ""
    if self.target.url:
        extra = f"[url: {self.target.url}]"
    s = f"<{type(self).__name__} [{self.target_id}] [{self.type_}] {extra}>"
    return s
boludoz commented 1 month ago
import asyncio
import nodriver as uc

async def main():
    browser = await uc.start()
    tab_instance = await browser.get('https://www.google.com/')
    await asyncio.sleep(1)
    keyboard_instance = tab_instance.keyboard
    await (await tab_instance.find('//input[1]')).click()
    await keyboard_instance.type('Hello World!')
    await keyboard_instance.down('Enter')
    await asyncio.sleep(5)

if __name__ == '__main__':

    # since asyncio.run never worked (for me)
    uc.loop().run_until_complete(main())
boludoz commented 1 month ago

Personally, this method disgusts me, I only did it for fun.

boludoz commented 1 month ago

Modified tab.py file

from __future__ import annotations
import asyncio
import json
import logging
import pathlib
import typing
import warnings
from typing import List, Union, Optional, Tuple
from typing import Any, Dict, Optional, TYPE_CHECKING
if TYPE_CHECKING:
    from typing import Set

import nodriver.core.browser
from . import element
from . import util
from .util import merge_dict, key_definitions
from .config import PathLike
from .connection import Connection, ProtocolException
from .. import cdp
    def __init__(
        self,
        websocket_url: str,
        target: cdp.target.TargetInfo,
        browser: Optional["nodriver.Browser"] = None,
        **kwargs,
    ):
        super().__init__(websocket_url, target, browser, **kwargs)
        self.browser = browser
        self._dom = None
        self._window_id = None
        self.keyboard = self.Keyboard(self)
        self.touchscreen = self.Touchscreen(self)
        self.mouse = self.Mouse(self)
    class Keyboard:
        def __init__(self, tab: 'Tab') -> None:
            self.tab = tab
            self._modifiers = 0
            self._pressed_keys: Set[str] = set()

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        async def down(self, key: str, options: dict = None, **kwargs: Any) -> None:
            """Dispatch a ``keydown`` event with ``key``."""
            options = merge_dict(options, kwargs)

            description = self._key_description_for_string(key)
            autoRepeat = description['code'] in self._pressed_keys
            self._pressed_keys.add(description['code'])
            self._modifiers |= self._modifier_bit(description['key'])

            text = options.get('text')
            if text is None:
                text = description['text']

            await self.tab.send(cdp.input_.dispatch_key_event(
                type_ = "keyDown" if text else "rawKeyDown",
                modifiers = self._modifiers,
                windows_virtual_key_code = description['keyCode'],
                code = description['code'],
                key = description['key'],
                text = text,
                unmodified_text = text,
                auto_repeat = autoRepeat,
                location = description['location'],
                is_keypad = description['location'] == 3,        
            ))

        def _modifier_bit(self, key: str) -> int:
            if key == 'Alt':
                return 1
            if key == 'Control':
                return 2
            if key == 'Meta':
                return 4
            if key == 'Shift':
                return 8
            return 0

        def _key_description_for_string(self, keyString: str) -> Dict:  # noqa: C901
            shift = self._modifiers & 8
            description = {
                'key': '',
                'keyCode': 0,
                'code': '',
                'text': '',
                'location': 0,
            }

            definition: Dict = key_definitions.get(keyString)  # type: ignore
            if not definition:
                raise Exception(f'Unknown key: {keyString}')

            if 'key' in definition:
                description['key'] = definition['key']
            if shift and definition.get('shiftKey'):
                description['key'] = definition['shiftKey']

            if 'keyCode' in definition:
                description['keyCode'] = definition['keyCode']
            if shift and definition.get('shiftKeyCode'):
                description['keyCode'] = definition['shiftKeyCode']

            if 'code' in definition:
                description['code'] = definition['code']

            if 'location' in definition:
                description['location'] = definition['location']

            if len(description['key']) == 1:  # type: ignore
                description['text'] = description['key']

            if 'text' in definition:
                description['text'] = definition['text']
            if shift and definition.get('shiftText'):
                description['text'] = definition['shiftText']

            if self._modifiers & ~8:
                description['text'] = ''

            return description

        async def up(self, key: str) -> None:
            """Dispatch a ``keyup`` event of the ``key``."""
            description = self._key_description_for_string(key)

            self._modifiers &= ~self._modifier_bit(description['key'])
            if description['code'] in self._pressed_keys:
                self._pressed_keys.remove(description['code'])
            await self.tab.send(cdp.input_.dispatch_key_event(
                type_ = "keyUp",
                modifiers = self._modifiers,
                key = description['key'],
                windows_virtual_key_code = description['keyCode'],
                code = description['code'],
                location = description['location'],       
            ))

        async def send_character(self, char: str) -> None:
            """Send character into the page."""
            await self.tab.send(cdp.input_.insert_text(char))

        async def type(self, text: str, options: Dict = None, **kwargs: Any) -> None:
            """Type characters into a focused element."""
            options = merge_dict(options, kwargs)
            delay = options.get('delay', 0)
            for char in text:
                if char in key_definitions:
                    await self.press(char, {'delay': delay})
                else:
                    await self.send_character(char)
                if delay:
                    await asyncio.sleep(delay / 1000)

        async def press(self, key: str, options: Dict = None, **kwargs: Any) -> None:
            """Press ``key``."""
            options = merge_dict(options, kwargs)

            await self.down(key, options)
            if 'delay' in options:
                await asyncio.sleep(options['delay'] / 1000)
            await self.up(key)

    class Mouse:
        """Mouse class.

        The :class:`Mouse` operates in main-frame CSS pixels relative to the
        top-left corner of the viewport.
        """

        def __init__(self, tab: 'Tab') -> None:
            self.tab = tab
            self._x = 0.0
            self._y = 0.0
            self._button = 'none'

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        async def move(self, x: float, y: float, options: dict = None,
                    **kwargs: Any) -> None:
            """Move mouse cursor (dispatches a ``mousemove`` event).

            Options can accepts ``steps`` (int) field. If this ``steps`` option
            specified, Sends intermediate ``mousemove`` events. Defaults to 1.
            """
            options = merge_dict(options, kwargs)
            from_x = self._x
            from_y = self._y
            self._x = x
            self._y = y
            steps = options.get('steps', 1)
            for i in range(1, steps + 1):
                x = round(from_x + (self._x - from_x) * (i / steps))
                y = round(from_y + (self._y - from_y) * (i / steps))
                await self.tab.send(cdp.input_.dispatch_mouse_event(
                    type_ = 'mouseMoved',
                    x = x,
                    y = y,
                    modifiers = self.tab.keyboard._modifiers,
                    button = self._button,
                ))

        async def click(self, x: float, y: float, options: dict = None,
                        **kwargs: Any) -> None:
            """Click button at (``x``, ``y``).

            Shortcut to :meth:`move`, :meth:`down`, and :meth:`up`.

            This method accepts the following options:

            * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
            ``left``.
            * ``clickCount`` (int): defaults to 1.
            * ``delay`` (int|float): Time to wait between ``mousedown`` and
            ``mouseup`` in milliseconds. Defaults to 0.
            """
            options = merge_dict(options, kwargs)
            await self.move(x, y)
            await self.down(options)
            if options and options.get('delay'):
                await asyncio.sleep(options.get('delay', 0) / 1000)
            await self.up(options)

        async def down(self, options: dict = None, **kwargs: Any) -> None:
            """Press down button (dispatches ``mousedown`` event).

            This method accepts the following options:

            * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
            ``left``.
            * ``clickCount`` (int): defaults to 1.
            """
            options = merge_dict(options, kwargs)
            self._button = options.get('button', 'left')
            await self.tab.send(cdp.input_.dispatch_mouse_event(
                type_ = 'mousePressed',
                button = self._button,
                x = self._x,
                y = self._y,
                modifiers = self.tab.keyboard._modifiers,
                click_count = options.get('clickCount') or 1,
            ))

        async def up(self, options: dict = None, **kwargs: Any) -> None:
            """Release pressed button (dispatches ``mouseup`` event).

            This method accepts the following options:

            * ``button`` (str): ``left``, ``right``, or ``middle``, defaults to
            ``left``.
            * ``clickCount`` (int): defaults to 1.
            """
            options = merge_dict(options, kwargs)
            self._button = 'none'
            await self.tab.send(cdp.input_.dispatch_mouse_event(
                type_ = 'mouseReleased',
                button = options.get('button', 'left'),
                x = self._x,
                y = self._y,
                modifiers = self.tab.keyboard._modifiers,
                click_count = options.get('clickCount') or 1,
            ))

    class Touchscreen:
        """Touchscreen class."""

        def __init__(self, tab: 'Tab') -> None:
            """Make new touchscreen object."""
            self.tab = tab

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            pass

        async def tap(self, x: float, y: float) -> None:
            """Tap (``x``, ``y``).

            Dispatches a ``touchstart`` and ``touchend`` event.
            """
            touch_points = [{'x': round(x), 'y': round(y)}]
            await self.tab.send(cdp.input_.dispatch_touch_event(
                type_ = 'touchStart',
                touch_points = touch_points,
                modifiers = self.tab.keyboard._modifiers,
            ))
            await self.tab.send(cdp.input_.dispatch_touch_event(
                type_ = 'touchEnd',
                touch_points = [],
                modifiers = self.tab.keyboard._modifiers,
            ))

usage:

import asyncio
import nodriver as uc

async def main():
    browser = await uc.start()
    tab_instance = await browser.get('https://www.google.com/')
    await asyncio.sleep(1)
    with tab_instance.keyboard as keyboard_instance:
        await (await tab_instance.find('//input[1]')).click()
        await keyboard_instance.type('Hello World!')
        await keyboard_instance.down('Enter')
        await asyncio.sleep(5)

if __name__ == '__main__':

    # since asyncio.run never worked (for me)
    uc.loop().run_until_complete(main())
boludoz commented 1 month ago

nodriver.zip

artcrespo commented 6 days ago

Hi @boludoz is this in pull-request?