Compare commits

...

20 Commits
0.7.0 ... 0.7.2

Author SHA1 Message Date
Daniel Pavel
e0fd21b429 version 0.7.2 2012-11-08 12:01:24 +02:00
Daniel Pavel
ec6a2d892a improved hidconsole in tty and batch mode 2012-11-08 12:01:00 +02:00
Daniel Pavel
a59ad221a1 improved support for HID 1.0 devices 2012-11-08 11:41:09 +02:00
Daniel Pavel
c9843e4408 properly implemented pinging a device 2012-11-08 11:39:56 +02:00
Daniel Pavel
ab6f17cc6b better support for devices that don't support the features call 2012-11-08 09:38:35 +02:00
Daniel Pavel
6602d7ee13 fixed query of name/serial for devices 2012-11-08 00:29:07 +02:00
Daniel Pavel
b2db0706c6 handle better devices which do not support feature calls 2012-11-08 00:28:40 +02:00
Daniel Pavel
8db1ce97a8 improved read of device serial 2012-11-07 22:34:43 +02:00
Daniel Pavel
8ae8d4502d fixed some log messages 2012-11-07 22:32:55 +02:00
Daniel Pavel
e5332500ea testing improved detection of device status 2012-11-07 22:13:16 +02:00
Daniel Pavel
5f6fd5a333 added codename for Performance MX mouse 2012-11-07 21:51:07 +02:00
Daniel Pavel
89205f0bbc better matching of task replies 2012-11-07 21:29:06 +02:00
Daniel Pavel
f172ceebd1 when input is not a tty, wait a second before exiting 2012-11-07 21:28:42 +02:00
Daniel Pavel
d5dec5f7ec logging fix 2012-11-07 21:14:50 +02:00
Daniel Pavel
f8d0beb471 don't forget the 6th device on scan 2012-11-07 20:41:14 +02:00
Daniel Pavel
7e44ec5ebf added requirements check in main application 2012-11-07 14:59:43 +02:00
Daniel Pavel
6e939e9485 small documentation updates 2012-11-06 18:44:24 +02:00
Daniel Pavel
40eacb0741 LD_LIBRARY_PATH no longer needed 2012-11-06 17:38:49 +02:00
Daniel Pavel
4cb9b30466 use the first python version found when starting the scripts 2012-11-05 20:52:51 +02:00
Daniel Pavel
5c38c33374 notify in case of permissions problem on the hidraw device 2012-11-04 15:30:39 +02:00
17 changed files with 285 additions and 187 deletions

15
README
View File

@@ -10,18 +10,19 @@ Currently the K750 solar keyboard is also queried for its solar charge status.
Support for other devices could be added in the future, but the K750 keyboard is
the only device I have and can test on.
Pairing and un-pairing of devices is planned, but not implemented at this time.
Requirements
------------
- Python (2.7 or 3.2).
- Python (2.7 or 3.2). Either version should work well.
- Gtk 3; Gtk 2 should partially work with some problems.
- Python GI (GObject Introspection), for Gtk bindings.
- pyudev for enumerating udev devices.
- Optional libnotify GI bindings, for desktop notifications.
The necessary packages for Debian/Ubuntu are `python-pyudev`/`python3-pyudev`,
`python-gi`/`python3-gi`, `gir1.2-gtk-3.0`, and optionally `gir1.2-notify-0.7`.
Installation
------------
@@ -33,13 +34,17 @@ In rules.d/ you'll find a udev rule file, to be copied in /etc/udev/rules.d/ (as
root).
In its current form it makes the UR device available for r/w by all users
belonging to the 'plugdev' system group (standard Debian group for pluggable
devices). It may need changes, specific to your particular system's
belonging to the 'plugdev' system group (standard Debian/Ubuntu group for
pluggable devices). It may need changes, specific to your particular system's
configuration.
If in doubt, replacing GROUP="plugdev" with GROUP="<your username>" should just
work.
After you copy the file to /etc/udev/rules.d (and possibly modify it for your
system), run 'udevadm control --reload-rules' as root for it to apply. Then
physically remove the Unifying Receiver, wait 30 seconds and re-insert it.
Thanks
------

View File

@@ -11,7 +11,7 @@ from logitech.unifying_receiver import api as _api
from logitech.unifying_receiver.listener import EventsListener as _EventsListener
from logitech.unifying_receiver.common import FallbackDict as _FallbackDict
from logitech import devices as _devices
from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES)
from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS)
#
#
@@ -37,7 +37,7 @@ class _FeaturesArray(object):
except _api._FeatureNotSupported:
self.supported = False
else:
count = _base.request(handle, self.device.number, _pack('!BB', index, 0x00))
count = None if index is None else _base.request(handle, self.device.number, _pack('!BB', index, 0x00))
if count is None:
self.supported = False
else:
@@ -105,20 +105,16 @@ class _FeaturesArray(object):
class DeviceInfo(_api.PairedDevice):
"""A device attached to the receiver.
"""
def __init__(self, listener, number, pair_code, status=STATUS.UNKNOWN):
def __init__(self, listener, number, status=STATUS.UNKNOWN):
super(DeviceInfo, self).__init__(listener.handle, number)
self._features = _FeaturesArray(self)
self.LOG = _Logger("Device[%d]" % number)
self._listener = listener
self._pair_code = pair_code
self._serial = None
self._codename = None
self._status = status
self.props = {}
self.features = _FeaturesArray(self)
# read them now, otherwise it it temporarily hang the UI
# if status >= STATUS.CONNECTED:
# n, k, s, f = self.name, self.kind, self.serial, self.firmware
@@ -156,54 +152,6 @@ class DeviceInfo(_api.PairedDevice):
t.append('Light: %d lux' % self.props[PROPS.LIGHT_LEVEL])
return ', '.join(t) if t else STATUS_NAME[STATUS.CONNECTED]
@property
def name(self):
if self._name is None:
if self._status < STATUS.CONNECTED:
codename = self.codename
if codename in NAMES:
self._name, self._kind = NAMES[codename]
else:
self._name = _api.get_device_name(self.handle, self.number, self.features)
return self._name or self.codename
@property
def kind(self):
if self._kind is None:
if self._status < STATUS.CONNECTED:
codename = self.codename
if codename in NAMES:
self._name, self._kind = NAMES[codename]
else:
self._kind = _api.get_device_kind(self.handle, self.number, self.features)
return self._kind or '?'
@property
def serial(self):
if self._serial is None:
# dodgy
b = bytearray(self._pair_code)
b[0] -= 0x10
serial = _base.request(self.handle, 0xFF, b'\x83\xB5', bytes(b))
if serial:
self._serial = _base._hex(serial[1:5])
return self._serial or '?'
@property
def codename(self):
if self._codename is None:
codename = _base.request(self.handle, 0xFF, b'\x83\xB5', self._pair_code)
if codename:
self._codename = codename[2:].rstrip(b'\x00').decode('ascii')
return self._codename or '?'
@property
def firmware(self):
if self._firmware is None:
if self._status >= STATUS.CONNECTED:
self._firmware = _api.get_device_firmware(self.handle, self.number, self.features)
return self._firmware or ()
def process_event(self, code, data):
if code == 0x10 and data[:1] == b'\x8F':
self.status = STATUS.UNAVAILABLE
@@ -231,7 +179,7 @@ class DeviceInfo(_api.PairedDevice):
return False
def __str__(self):
return 'DeviceInfo(%d,%s,%d)' % (self.number, self._name or '?', self._status)
return '<DeviceInfo(%d,%s,%d)>' % (self.number, self._name or '?', self._status)
#
#
@@ -292,12 +240,12 @@ class ReceiverListener(_EventsListener):
self.status_changed_callback(self.receiver, device, urgent)
def _device_status_from(self, event):
state_code = ord(event.data[2:3]) & 0xF0
state = STATUS.UNAVAILABLE if state_code == 0x60 else \
STATUS.CONNECTED if state_code == 0xA0 else \
STATUS.CONNECTED if state_code == 0x20 else \
STATUS.UNKNOWN
if state == STATUS.UNKNOWN:
state_code = ord(event.data[2:3]) & 0xC0
state = STATUS.UNAVAILABLE if state_code == 0x40 else \
STATUS.CONNECTED if state_code == 0x80 else \
STATUS.CONNECTED if state_code == 0x00 else \
None
if state is None:
self.LOG.warn("don't know how to handle state code 0x%02X: %s", state_code, event)
return state
@@ -309,7 +257,7 @@ class ReceiverListener(_EventsListener):
if event.devnumber in self.receiver.devices:
status = self._device_status_from(event)
if status > STATUS.UNKNOWN:
if status is not None:
self.receiver.devices[event.devnumber].status = status
else:
dev = self.make_device(event)
@@ -343,11 +291,13 @@ class ReceiverListener(_EventsListener):
return None
status = self._device_status_from(event)
if status is not None:
dev = DeviceInfo(self, event.devnumber, status)
self.LOG.info("new device %s", dev)
self.status_changed(dev, True)
return dev
dev = DeviceInfo(self, event.devnumber, event.data[4:5], status)
self.LOG.info("new device %s", dev)
self.status_changed(dev, True)
return dev
self.LOG.error("failed to identify status of device %d from %s", event.devnumber, event)
def unpair_device(self, device):
try:

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python
APPNAME = 'Solaar'
NAME = 'Solaar'
VERSION = '0.7.2'
__author__ = "Daniel Pavel <daniel.pavel@gmail.com>"
__version__ = '0.7'
__version__ = VERSION
__license__ = "GPL"
#
@@ -11,7 +12,7 @@ __license__ = "GPL"
def _parse_arguments():
import argparse
arg_parser = argparse.ArgumentParser(prog=APPNAME.lower())
arg_parser = argparse.ArgumentParser(prog=NAME.lower())
arg_parser.add_argument('-v', '--verbose',
action='count', default=0,
help='increase the logger verbosity (may be repeated)')
@@ -36,25 +37,41 @@ def _parse_arguments():
return args
def _check_requirements():
try:
import pyudev
except ImportError:
return 'python-pyudev'
try:
import gi.repository
except ImportError:
return 'python-gi'
try:
from gi.repository import Gtk
except ImportError:
return 'gir1.2-gtk-3.0'
if __name__ == '__main__':
args = _parse_arguments()
req_fail = _check_requirements()
if req_fail:
raise ImportError('missing required package: %s' % req_fail)
import ui
# check if the notifications are available and enabled
args.notifications &= args.systray
if ui.notify.available and ui.notify.init(APPNAME):
if ui.notify.available and ui.notify.init(NAME):
ui.action.toggle_notifications.set_active(args.notifications)
else:
ui.action.toggle_notifications = None
from receiver import (ReceiverListener, DUMMY)
window = ui.main_window.create(APPNAME,
DUMMY.name,
DUMMY.max_devices,
args.systray)
from receiver import DUMMY
window = ui.main_window.create(NAME, DUMMY.name, DUMMY.max_devices, args.systray)
if args.systray:
menu_actions = (ui.action.toggle_notifications,
ui.action.about)
@@ -72,18 +89,23 @@ if __name__ == '__main__':
def status_changed(receiver, device=None, urgent=False):
ui.update(receiver, icon, window, device)
if ui.notify.available and urgent:
ui.notify.show(device or receiver)
GObject.idle_add(ui.notify.show, device or receiver)
global listener
if not listener:
GObject.timeout_add(5000, check_for_listener)
listener = None
from receiver import ReceiverListener
def check_for_listener(retry=True):
global listener, notify_missing
if listener is None:
listener = ReceiverListener.open(status_changed)
try:
listener = ReceiverListener.open(status_changed)
except OSError:
ui.show_permissions_warning(window)
if listener is None:
pairing.state = None
if notify_missing:

View File

@@ -1,18 +1,17 @@
# pass
APPNAME = 'Solaar'
APPVERSION = '0.7'
from . import (notify, status_icon, main_window, pair_window, action)
from gi.repository import (GObject, Gtk)
GObject.threads_init()
from solaar import NAME as _NAME
_APP_ICONS = (_NAME + '-fail', _NAME + '-init', _NAME)
def appicon(receiver_status):
return (APPNAME + '-fail' if receiver_status < 0 else
APPNAME + '-init' if receiver_status < 1 else
APPNAME)
return (_APP_ICONS[0] if receiver_status < 0 else
_APP_ICONS[1] if receiver_status < 1 else
_APP_ICONS[2])
_ICON_THEME = Gtk.IconTheme.get_default()
@@ -26,6 +25,16 @@ def icon_file(name):
return None
def show_permissions_warning(window):
text = ('Found a possible Unifying Receiver device,\n'
'but did not have permission to open it.')
m = Gtk.MessageDialog(window, Gtk.DialogFlags.MODAL, Gtk.MessageType.ERROR, Gtk.ButtonsType.CLOSE, text)
m.set_title('Permissions error')
m.run()
m.destroy()
def find_children(container, *child_names):
assert container is not None

View File

@@ -2,9 +2,13 @@
#
#
# from sys import version as PYTTHON_VERSION
from gi.repository import Gtk
import ui
import ui.notify
import ui.pair_window
from solaar import NAME as _NAME
from solaar import VERSION as _VERSION
def _action(name, label, function, *args):
@@ -27,7 +31,7 @@ def _toggle_action(name, label, function, *args):
def _toggle_notifications(action):
if action.get_active():
ui.notify.init(ui.APPNAME)
ui.notify.init(_NAME)
else:
ui.notify.uninit()
action.set_sensitive(ui.notify.available)
@@ -36,16 +40,18 @@ toggle_notifications = _toggle_action('notifications', 'Notifications', _toggle_
def _show_about_window(action):
about = Gtk.AboutDialog()
about.set_icon_name(ui.APPNAME)
about.set_program_name(ui.APPNAME)
about.set_logo_icon_name(ui.APPNAME)
about.set_version(ui.APPVERSION)
about.set_icon_name(_NAME)
about.set_program_name(_NAME)
about.set_logo_icon_name(_NAME)
about.set_version(_VERSION)
about.set_license_type(Gtk.License.GPL_2_0)
about.set_authors(('Daniel Pavel http://github.com/pwr', ))
about.set_website('http://github.com/pwr/Solaar/wiki')
about.set_website_label('Solaar Wiki')
# about.set_comments('Using Python %s\n' % PYTTHON_VERSION.split(' ')[0])
about.run()
about.destroy()
about = _action('help-about', 'About ' + ui.APPNAME, _show_about_window)
about = _action('help-about', 'About ' + _NAME, _show_about_window)
quit = _action('exit', 'Quit', Gtk.main_quit)

View File

@@ -22,8 +22,9 @@ def _info_text(dev):
(f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware])
return ('<small>'
'Serial \t\t<tt>%s</tt>\n'
'HID protocol\t<tt>%1.1f</tt>\n'
'%s'
'</small>' % (dev.serial, fw_text))
'</small>' % (dev.serial, dev.protocol, fw_text))
def _toggle_info(action, label_widget, box_widget, frame):
if action.get_active():

View File

@@ -1,7 +1,9 @@
#!/bin/sh
LIB=`dirname "$0"`/../lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
Z=`readlink -f "$0"`
LIB=`readlink -f $(dirname "$Z")/../lib`
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
export PYTHONPATH=$LIB
exec python -OOu -m hidapi.hidconsole "$@"
PYTHON=`which python python2 python3 | head -n 1`
exec $PYTHON -OOu -m hidapi.hidconsole "$@"

View File

@@ -1,7 +1,9 @@
#!/bin/sh
LIB=`dirname "$0"`/../lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
Z=`readlink -f "$0"`
LIB=`readlink -f $(dirname "$Z")/../lib`
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
export PYTHONPATH=$LIB
exec python -OOu -m logitech.scanner "$@"
PYTHON=`which python python2 python3 | head -n 1`
exec $PYTHON -OOu -m logitech.scanner "$@"

View File

@@ -5,9 +5,9 @@ APP=`readlink -f $(dirname "$Z")/../app`
LIB=`readlink -f $(dirname "$Z")/../lib`
SHARE=`readlink -f $(dirname "$Z")/../share`
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
export PYTHONPATH=$APP:$LIB
export XDG_DATA_DIRS=$SHARE:$XDG_DATA_DIRS
exec python -OOu -m solaar "$@"
#exec python -OOu -m profile -o $TMPDIR/profile.log app/solaar.py "$@"
PYTHON=`which python python2 python3 | head -n 1`
exec $PYTHON -OOu -m solaar "$@"

View File

@@ -1,11 +1,14 @@
#!/usr/bin/env python
import os
import sys
from select import select as _select
import time
from binascii import hexlify, unhexlify
_hex = lambda d: hexlify(d).decode('ascii').upper()
interactive = os.isatty(0)
start_time = 0
try:
read_packet = raw_input
@@ -14,19 +17,18 @@ except:
def _print(marker, data, scroll=False):
hexs = _hex(data)
t = time.time() - start_time
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
if scroll:
if interactive and scroll:
sys.stdout.write('\033[s')
sys.stdout.write('\033[S') # scroll up
sys.stdout.write('\033[A\033[L\033[G') # insert new line above the current one, position on first column
hexs = _hex(data)
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
sys.stdout.write(s)
if scroll:
if interactive and scroll:
sys.stdout.write('\033[u')
else:
sys.stdout.write('\n')
@@ -57,17 +59,18 @@ if __name__ == '__main__':
repr(hidapi.get_manufacturer(handle)),
repr(hidapi.get_product(handle)),
repr(hidapi.get_serial(handle))))
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
if interactive:
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
import readline
if args.history is None:
import os.path
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
try:
readline.read_history_file(args.history)
except:
# file may not exist yet
pass
import readline
if args.history is None:
import os.path
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
try:
readline.read_history_file(args.history)
except:
# file may not exist yet
pass
start_time = time.time()
@@ -77,8 +80,10 @@ if __name__ == '__main__':
t.daemon = True
t.start()
prompt = '?? Input: ' if interactive else ''
while t.is_alive():
line = read_packet('?? Input: ').strip().replace(' ', '')
line = read_packet(prompt).strip().replace(' ', '')
if line:
try:
data = unhexlify(line.encode('ascii'))
@@ -87,11 +92,18 @@ if __name__ == '__main__':
else:
_print('<<', data)
hidapi.write(handle, data)
except Exception as e:
# wait for some kind of reply
if not interactive:
rlist, wlist, xlist = _select([handle], [], [], 1)
time.sleep(0.1)
except EOFError:
pass
except Exception as e:
print ('%s: %s' % (type(e).__name__, e))
print (".. Closing handle %X" % handle)
hidapi.close(handle)
readline.write_history_file(args.history)
if interactive:
readline.write_history_file(args.history)
else:
print ("!! Failed to open %s, aborting" % args.device)

View File

@@ -1,7 +1,11 @@
#
# Partial Python implementation of the native hidapi.
# Requires pyudev
#
"""Generic Human Interface Device API.
It is currently a partial pure-Python implementation of the native HID API
implemented by signal11 (https://github.com/signal11/hidapi), and requires
``pyudev``.
The docstrings are mostly copied from the hidapi API header, with changes where
necessary.
"""
import os as _os
from select import select as _select
@@ -120,10 +124,7 @@ def open_path(device_path):
:returns: an opaque device handle, or ``None``.
"""
try:
return _os.open(device_path, _os.O_RDWR | _os.O_SYNC)
except:
pass
return _os.open(device_path, _os.O_RDWR | _os.O_SYNC)
def close(device_handle):

View File

@@ -43,4 +43,5 @@ NAMES = {
'K750': ('Wireless Solar Keyboard K750', 'keyboard'),
'K800': ('Wireless Illuminated Keyboard K800', 'keyboard'),
'T650': ('Wireless Rechargeable Touchpad T650', 'touchpad'),
'Performance MX': ('Performance Mouse MX', 'mouse'),
}

View File

@@ -8,18 +8,26 @@ def print_receiver(receiver):
for f in receiver.firmware:
print (" %-10s: %s" % (f.kind, f.version))
print ("--------")
def scan_devices(receiver):
for dev in receiver:
print ("--------")
print (str(dev))
print ("Name: %s" % dev.name)
print ("Kind: %s" % dev.kind)
print ("Name : %s" % dev.name)
print ("Kind : %s" % dev.kind)
print ("Serial number: %s" % dev.serial)
if not dev.protocol:
print ("HID protocol : UNKNOWN")
continue
print ("HID protocol : HID %01.1f" % dev.protocol)
if dev.protocol < 2.0:
print ("Features query not supported by this device")
continue
firmware = dev.firmware
for fw in firmware:
print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version))
print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version))
all_features = api.get_device_features(dev.handle, dev.number)
for index in range(0, len(all_features)):

View File

@@ -4,6 +4,7 @@
from struct import pack as _pack
from struct import unpack as _unpack
import errno as _errno
from . import base as _base
@@ -31,32 +32,74 @@ class PairedDevice(object):
self.handle = handle
self.number = number
self._protocol = None
self._features = None
self._codename = None
self._name = None
self._kind = None
self._serial = None
self._firmware = None
self.features = [FEATURE.ROOT]
@property
def protocol(self):
if self._protocol is None:
self._protocol = _base.ping(self.handle, self.number)
return 0 if self._protocol is None else self._protocol
@property
def features(self):
if self._features is None:
if self.protocol >= 2.0:
self._features = [FEATURE.ROOT]
return self._features
@property
def codename(self):
if self._codename is None:
codename = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x40 + self.number - 1)
if codename:
self._codename = codename[2:].rstrip(b'\x00').decode('ascii')
return self._codename or '?'
@property
def name(self):
if self._name is None:
self._name = get_device_name(self.handle, self.number, self.features)
return self._name or '?'
if self.protocol < 2.0:
from ..devices.constants import NAMES as _DEVICE_NAMES
if self.codename in _DEVICE_NAMES:
self._name, self._kind = _DEVICE_NAMES[self._codename]
else:
self._name = get_device_name(self.handle, self.number, self.features)
return self._name or self.codename
@property
def kind(self):
if self._kind is None:
self._kind = get_device_kind(self.handle, self.number, self.features)
if self.protocol < 2.0:
from ..devices.constants import NAMES as _DEVICE_NAMES
if self.codename in _DEVICE_NAMES:
self._name, self._kind = _DEVICE_NAMES[self._codename]
else:
self._kind = get_device_kind(self.handle, self.number, self.features)
return self._kind or '?'
@property
def firmware(self):
if self._firmware is None:
if self._firmware is None and self.protocol >= 2.0:
self._firmware = get_device_firmware(self.handle, self.number, self.features)
return self._firmware or ()
@property
def serial(self):
if self._serial is None:
prefix = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x20 + self.number - 1)
serial = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x30 + self.number - 1)
if prefix and serial:
self._serial = _base._hex(prefix[3:5]) + '-' + _base._hex(serial[1:5])
return self._serial or '?'
def ping(self):
reply = _base.request(self.handle, self.number, b'\x00\x10', b'\x00\x00\xAA')
return reply is not None and reply[2:3] == b'\xAA'
return _base.ping(self.handle, self.number) is not None
def __str__(self):
return '<PairedDevice(%X,%d,%s)>' % (self.handle, self.number, self._name or '?')
@@ -113,7 +156,7 @@ class Receiver(object):
if self.handle == 0:
return
for number in range(1, MAX_ATTACHED_DEVICES):
for number in range(1, 1 + MAX_ATTACHED_DEVICES):
dev = get_device(self.handle, number)
if dev is not None:
yield dev
@@ -147,8 +190,8 @@ class Receiver(object):
if self.handle == 0:
return False
if type(dev) == int:
return (dev < 1 or dev > MAX_ATTACHED_DEVICES) and ping(self.handle, dev)
return ping(self.handle, dev.number)
return dev > 0 and dev <= MAX_ATTACHED_DEVICES and _base.ping(self.handle, dev) is not None
return dev.ping()
def __str__(self):
return '<Receiver(%X,%s)>' % (self.handle, self.path)
@@ -164,10 +207,22 @@ class Receiver(object):
:returns: An open file handle for the found receiver, or ``None``.
"""
exception = None
for rawdevice in _base.list_receiver_devices():
handle = _base.try_open(rawdevice.path)
if handle:
return Receiver(handle, rawdevice.path)
exception = None
try:
handle = _base.try_open(rawdevice.path)
if handle:
return Receiver(handle, rawdevice.path)
except OSError as e:
_log.exception("open %s", rawdevice.path)
if e.errno == _errno.EACCES:
exception = e
if exception:
# only keep the last exception
raise exception
#
#
@@ -215,20 +270,12 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features=N
return _base.request(handle, devnumber, feature_index + function, params)
def ping(handle, devnumber):
"""
:returns: True if the device is connected to the UR.
"""
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
return reply is not None and reply[2:3] == b'\xAA'
def get_device(handle, devnumber, features=None):
"""Gets the complete info for a device (type, features).
:returns: a PairedDevice or ``None``.
"""
if ping(handle, devnumber):
if _base.ping(handle, devnumber):
devinfo = PairedDevice(handle, devnumber)
# _log.debug("found device %s", devinfo)
return devinfo
@@ -278,6 +325,7 @@ def _get_feature_index(handle, devnumber, feature, features=None):
if len(features) <= index:
features += [None] * (index + 1 - len(features))
features[index] = feature
# _log.debug("%s: found feature %s at %d", features, _base._hex(feature), index)
return index

View File

@@ -4,6 +4,7 @@
#
from struct import pack as _pack
from struct import unpack as _unpack
from binascii import hexlify as _hexlify
_hex = lambda d: _hexlify(d).decode('ascii').upper()
@@ -268,12 +269,12 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None
if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == feature_index_function:
# device not present
_log.debug("device %d request ping failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data))
_log.debug("device %d request failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data))
return None
if reply_code == 0x10 and reply_data[:1] == b'\x8F':
# device not present
_log.debug("request ping failed: [%s]", devnumber, _hex(reply_data))
_log.debug("device %d request failed: [%s]", devnumber, _hex(reply_data))
return None
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
@@ -298,3 +299,50 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None
# _log.debug("device %d unmatched reply {%s} (expected {%s})", devnumber, _hex(reply_data[:2]), _hex(feature_index_function))
if _unhandled:
_unhandled(reply_code, reply_devnumber, reply_data)
def ping(handle, devnumber):
"""Check if a device is connected to the UR.
:returns: The HID protocol supported by the device, as a floating point number, if the device is active.
"""
if request_context is None or handle != request_context.handle:
context = _DEFAULT_REQUEST_CONTEXT
_unhandled = unhandled_hook
else:
context = request_context
_unhandled = getattr(context, 'unhandled_hook')
context.write(handle, devnumber, b'\x00\x10\x00\x00\xAA')
read_times = _MAX_READ_TIMES
while read_times > 0:
divisor = (1 + _MAX_READ_TIMES - read_times)
reply = context.read(handle, int(DEFAULT_TIMEOUT * (divisor + 1) / 2 / divisor))
read_times -= 1
if not reply:
# keep waiting...
continue
reply_code, reply_devnumber, reply_data = reply
if reply_devnumber != devnumber:
# this message not for the device we're interested in
# _l.log(_LOG_LEVEL, "device %d request got reply for unexpected device %d: [%s]", devnumber, reply_devnumber, _hex(reply_data))
# worst case scenario, this is a reply for a concurrent request
# on this receiver
if _unhandled:
_unhandled(reply_code, reply_devnumber, reply_data)
continue
if reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == b'\xAA':
major, minor = _unpack('!BB', reply_data[2:4])
return major + minor / 10.0
if reply_code == 0x10 and reply_data == b'\x8F\x00\x10\x01\x00':
return 1.0
if reply_code == 0x10 and reply_data[:3] == b'\x8F\x00\x10':
return None
_log.warn("don't know how to interpret ping reply %s", reply)

View File

@@ -83,14 +83,10 @@ class EventsListener(_Thread):
matched = False
task = None if self._tasks.empty() else self._tasks.queue[0]
if task and task[-1] is None:
devnumber, data = task[:2]
if event[1] == devnumber:
# _log.debug("matching %s to %d, %s", event, devnumber, repr(data))
if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF):
matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2])
elif event[0] == 0x10:
if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]:
matched = True
task_dev, task_data = task[:2]
if event[1] == task_dev:
# _log.debug("matching %s to (%d, %s)", event, task_dev, repr(task_data))
matched = event[2][:2] == task_data[:2] or (event[2][:1] in b'\x8F\xFF' and event[2][1:3] == task_data[:2])
if matched:
# _log.debug("request reply %s", event)
@@ -140,6 +136,6 @@ class EventsListener(_Thread):
_log.info("queueing unhandled event %s", event)
self._events.put(event)
def __nonzero__(self):
def __bool__(self):
return bool(self._active and self._handle)
__bool__ = __nonzero__
__nonzero__ = __bool__

View File

@@ -6,19 +6,6 @@
# Make sure the plugdev group exists on your system and your user is a member
# before applying these rules.
# If you are using the libusb implementation of hidapi (hid-libusb.c), then
# use something like the following line, substituting the VID and PID with
# those of your device. Note that for kernels before 2.6.24, you will need
# to substitute "usb" with "usb_device". It shouldn't hurt to use two lines
# (one each way) for compatibility with older systems.
# HIDAPI/libusb
ACTION=="add", SUBSYSTEM=="usb", ATTR{idVendor}=="046d", ATTR{idProduct}=="c52b", GROUP="plugdev", MODE="0660"
# If you are using the hidraw implementation, then do something like the
# following, substituting the VID and PID with your device.
# HIDAPI/hidraw
ACTION=="add", KERNEL=="hidraw*", ATTRS{idVendor}=="046d", ATTRS{idProduct}=="c52b", GROUP="plugdev", MODE="0660"