I'm using the great "evdev" library to listen to a USB barcode reader input and I need to detect if the device suddenly gets unplugged/unresponsive because otherwise the python script reading the loop goes to 100% cpu usage on a single thread and slowly starts to eat all available memory which leads to the entire system crashing after a bit.
The idea is to detect when the device is unplugged and kill the current script leading to supervisor trying to restart it until the device is plugged back in/becomes responsive.
The code I'm using to read the input is as follows:
devices = map(InputDevice, list_devices())
keys = {
2: 1,
3: 2,
4: 3,
5: 4,
6: 5,
7: 6,
8: 7,
9: 8,
10: 9,
11: 0,
}
dev = None
for d in devices:
if d.name == 'Symbol Technologies, Inc, 2008 Symbol Bar Code Scanner':
print('%-20s %-32s %s' % (d.fn, d.name, d.phys))
dev = InputDevice(d.fn)
break
if dev is not None:
code = []
for event in dev.read_loop():
if event.type == ecodes.EV_KEY:
if event.value == 00:
if event.code != 96:
try:
code.append(keys[event.code])
except:
code.append('-')
else:
card = "".join(map(str, code))
print card
code = []
card = ""
So how would I go about doing this the proper way?
A way I though that might work would be a second script that's run from cron every 1-5 min that checks if said device is still available, if it's now, grab process id from some file and kill the process that way but the problem with this method is that if the device is unplugged and then plugged back between the checks the "checker" script thinks everything is okay while the main script is slowly crashing - it doesn't re-activate after an "unplugging"
python-evdev author here. It is such a great feeling to know that one's work is useful to someone else. Thank you for that!
You should definitely look into linux's device manager - udev. The linux kernel emits events whenever a device is added or removed. To listen for these events in a Python program, you can use pyudev, which is an excellent, ctypes based binding to libudev (see the section on monitoring).
Here's an example of using evdev
along with pyudev
:
import functools
import pyudev
from evdev import InputDevice
from select import select
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
monitor.start()
fds = {monitor.fileno(): monitor}
finalizers = []
while True:
r, w, x = select(fds, [], [])
if monitor.fileno() in r:
r.remove(monitor.fileno())
for udev in iter(functools.partial(monitor.poll, 0), None):
# we're only interested in devices that have a device node
# (e.g. /dev/input/eventX)
if not udev.device_node:
break
# find the device we're interested in and add it to fds
for name in (i['NAME'] for i in udev.ancestors if 'NAME' in i):
# I used a virtual input device for this test - you
# should adapt this to your needs
if u'py-evdev-uinput' in name:
if udev.action == u'add':
print('Device added: %s' % udev)
fds[dev.fd] = InputDevice(udev.device_node)
break
if udev.action == u'remove':
print('Device removed: %s' % udev)
def helper():
global fds
fds = {monitor.fileno(): monitor}
finalizers.append(helper)
break
for fd in r:
dev = fds[fd]
for event in dev.read():
print(event)
for i in range(len(finalizers)):
finalizers.pop()()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With