Open Mr0Inka opened 5 years ago
I also have this issue. I sometimes connect and disconnect external devices and I would like to be able to continuously read from them. I have also tried importlib.reload
to no avail.
Also note that I can "hack" around it by doing (in UNIX systems):
from keyboard._nixkeyboard import build_device
keyboard._nixkeyboard.device = None
build_device()
But this is less than ideal.
[This applies to *nix only]
I'm sorry to add some more spam to this thread, but I'd like to make it clear that the above solution ends up not working in the majority of cases (because keyboard
spawns a thread that we cannot easily control from the outside, among other things).
The solution we implemented ended up being much more complex than necessary. We basically run keyboard
on its own script whose sole purpose is to capture keyboard and output it to the main program. If the program crashes (which happens often when input devices are being swapped around), we just restart it, and if we detect that input devices have changed, we also force the keyboard listener to restart.
Regarding detecting that input devices have changed, there are several possible solutions but in our case a simple polling of /dev/input/by-id/
for changes was sufficient.
I hope this helps other people with the some problem getting around this "limitation".
HI ! Thank for your insight ! Would you mind sharing a part of your script about the separate thread for keyboard? It's killing me :D
That code is part of a much bigger proprietary project, and at the moment I can only share subsets of it, but I'll do my best.
What we do is not run "keyboard" on its own thread, because that would not work. We run an entire new instance of python which is running a simple echo keyboard program. Then in the main program we monitor, capturing its output through a pipe, and restarting it whenever we detect that input should be changed.
Part of this code might not work 100% on your end because I have had to remove some proprietary pieces, but it'll allow you to get the gist of it:
import functools
import os
import re
import sys
import threading
from select import select
from subprocess import Popen, PIPE, DEVNULL
from typing import Callable
from keyboard import KeyboardEvent
class KeyboardEchoProgram:
"""
This is the program that should run whenever we are called to do standalone
stuff. All it does is listen to keyboard events using the keyboard library
and prints them to standard output, flushing it after.
It waits for a CTRL+C to be killed.
"""
def __init__(self) -> None:
super().__init__()
import keyboard
keyboard.hook(self.handle_presses)
threading.Event().wait()
def handle_presses(self, event):
s = '(%s ; %s ; %s ; %s)' % (event.name, event.device, event.event_type, event.scan_code)
print(s)
sys.stdout.flush()
class RehookableKeyboardListener(threading.Thread):
"""
This class acts as a kind of "safe wrapper" around some of keyboard's
methods, allowing for automatic rehooks (on failure) and user-requested
rehooks.
You pass in the callback and it will be passed down to keyboard.hook
(which will actually be running in a separate python process).
The problem is that keyboard runs a thread on the package's __init__.py, so
if it dies we cannot do anything, nor can we easily rehook stuff (there is
no method for that). Therefore, our fix is to always run rehook on an
independent python script and communicate with that script, restarting it
when necessary.
You can call rehook() to require that we rehook the keyboard. This is useful
when used together with the InputDeviceEventListener.
We chose this solution instead of something like an evdev based
implementation because the keyboard library already does a lot of work for
us.
"""
def __init__(self, callback: Callable[[KeyboardEvent], None]) -> None:
super().__init__()
self.callback = callback
self.should_rehook = WaitableEvent()
self.should_stop = WaitableEvent()
def rehook(self) -> None:
"""
Require that the keyboard is rehooked. There will be a small interval
of time during which events will not be captured.
This effectively kills a python program and spawns another one.
:return:
"""
self.should_rehook.set()
def stop(self) -> None:
"""
Ask that the thread to die off and stop listening.
:return:
"""
self.should_stop.set()
def _run_script(self) -> Popen:
"""
Run the script:
- Unbuffered
- with stdin and stdout piped.
- Ignoring stderr (set to /dev/null)
- With the "work" parameter so it launches KeyboardEchoProgram
:return:
"""
logger.debug('Launching python keyboard listening script.')
return Popen(['python', '-u', os.path.abspath(__file__), 'work'], stdin=PIPE, stdout=PIPE, stderr=DEVNULL,
bufsize=1)
def run(self) -> bool:
# Start the background script
p1 = self._run_script()
# This will capture the keys sent from the script (which in effect
# is just KeyboardEchoProgram())
capturing_regex = re.compile(r'\((.*) ; (.*) ; (.*) ; (.*)\)$')
# Loop forever reading keys, waiting to be stopped, or rehooked.
while True:
# Wait for one of (i) key read; (ii) stopping; (iii) rehooking
r = select([p1.stdout, self.should_stop, self.should_rehook], [], [])[0]
# If we have to stop, kill the child and die off.
if self.should_stop.is_set():
p1.kill()
break
# If we have to rehook, clear the flag for further rehooking, kill
# the child and start a new one. Then go back to waiting.
if self.should_rehook.is_set():
self.should_rehook.clear()
p1.kill()
p1 = self._run_script()
continue
# If the child has died, restart it.
if p1.poll() is not None:
p1 = self._run_script()
continue
# In theory, r should always be ready to read if we get here, but
# check just in case.
if r:
# We know that the output is always line-based and
# line-buffered, so this will never block.
line = p1.stdout.readline()
# All of this replace and strip nonsense is here just in case
# (it comes from old code)
line = line.decode().replace('\n', '').strip()
# Catch the regex and "fabricate" a KeyboardEvent so that
# it seems like this just came out of "keyboard.hook". We could
# have properly serialized it with pickle or something, but
# this is good enough for now.
match = capturing_regex.match(line)
if match:
if self.callback:
self.callback(KeyboardEvent(name=match.group(1), device=match.group(2),
event_type=match.group(3), scan_code=int(match.group(4))))
else:
logger.warn('The keyboard capture program gave us input which does not match the regex! '
'(Filtered) Input: %s' % line)
return True
def main():
# When we're called with an argument it means we should go into echo mode!
if len(sys.argv) >= 2:
KeyboardEchoProgram()
if __name__ == "__main__":
main()
We use another class to monitor for if we have to rehook or not (that class triggers the call to rehook
), and it basically boils down to a method such as this one:
def _directory_polling_approach(self) -> None:
"""
A much simpler polling approach which basically runs
`ls /dev/input/by-id/` in a loop, waiting for changes.
:return:
"""
devices = self._list_devices()
while not self.should_stop.wait(self.polling_interval):
new_devices = self._list_devices()
if new_devices != devices:
self.on_device_change()
devices = new_devices
Originally, we tried hooking up with Udev and monitoring through it, but it become an incredible, buggy, garbled mess. This polling trick is good enough for our use case.
Also, please note that the WaitableEvent
class acts basically as an Event
which we can pass down to select
(it is similar to https://code.activestate.com/recipes/498191-waitable-cross-process-threadingevent-class/)
I hope this helps. It's not perfect code, far from it.
Hi, I have run into the same problem and your conversation on this issue.
Thanks for your comments, they gave me ideas to come up with the solution that I apply
Sorry for my low level of English.
In my case, the solution has been to restart the functions that call the keyboard for linux. I also reset the hook.
These are the lines I use for it:
keyboard.unhook_all()
keyboard._nixkeyboard.device = None
keyboard._nixkeyboard.build_device()
keyboard._nixkeyboard.build_tables()
keyboard._listener = keyboard._KeyboardListener()
keyboard.hook(self.callback)
I have an application to count keystrokes where I use it like this and on several computers it seems to work correctly.
From the previous lines, I think this would be enough:
keyboard._listener = keyboard._KeyboardListener()
So far it has worked for me with 7 different keyboards (connecting and reconnecting them) by usb and laptops.
The comments are in Spanish but I will still leave it in case it may be useful to another who reaches this issue.
https://github.com/fryntiz/python-keycounter/blob/master/Models/Keylogger.py
Python code is not my specialty, it can contain strange things, it is also intended only for linux
Any update on this?
I created the following workaround:
import keyboard
import requests
from threading import Thread
import subprocess
from time import sleep
import sys
SERVER_URL = 'http://sads'
ID = 'afasfe'
def buttonPressed(keyboardEvent: keyboard.KeyboardEvent):
button = keyboardEvent.name
if (button == '1'):
print('do1')
sendCommand('do1')
elif (button == '2'):
print('do2')
sendCommand('do2')
elif (button == '3'):
print('do3')
sendCommand('do3')
def sendCommand(command: str):
url = f"{SERVER_URL}/{command}/{ID}"
try:
result = requests.post(url).json()
if ('error' in result):
print(f"ERROR: {result['error']}")
except Exception as e:
print(f"ERROR: {e}")
def getDevices():
return subprocess.getoutput('ls /dev/input/by-id/')
def startNewDeviceListener():
while True:
newDevices = getDevices()
if (devices != newDevices):
print('Device changes detected, exiting')
sys.exit(1)
sleep(1)
keyboard.on_press(buttonPressed, True)
devices = getDevices()
t = Thread(target=startNewDeviceListener)
t.start()
I'm using this python program with Docker so that the Docker daemon will then restart the script after it exits.
Hey,
Detecting my keyboard and key presses works absolutely fine. When I disconnect my keyboard, both, usb and bluetooth, I get a "no such device" error. Makes perfect sense, I just unplugged it.
Is there any way to revive the connection after disconnecting and reconnecting a keyboard? I tried to to a keyboard.unhook_all and then keyboard.on_press after reconnecting the keyboard without success.
Is there any way to reload the library/connection without closing the whole py script?
Thanks! :)