Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0fd21b429 | ||
|
|
ec6a2d892a | ||
|
|
a59ad221a1 | ||
|
|
c9843e4408 | ||
|
|
ab6f17cc6b | ||
|
|
6602d7ee13 | ||
|
|
b2db0706c6 | ||
|
|
8db1ce97a8 | ||
|
|
8ae8d4502d | ||
|
|
e5332500ea | ||
|
|
5f6fd5a333 | ||
|
|
89205f0bbc | ||
|
|
f172ceebd1 | ||
|
|
d5dec5f7ec | ||
|
|
f8d0beb471 | ||
|
|
7e44ec5ebf | ||
|
|
6e939e9485 | ||
|
|
40eacb0741 | ||
|
|
4cb9b30466 | ||
|
|
5c38c33374 |
15
README
15
README
@@ -10,18 +10,19 @@ Currently the K750 solar keyboard is also queried for its solar charge status.
|
||||
Support for other devices could be added in the future, but the K750 keyboard is
|
||||
the only device I have and can test on.
|
||||
|
||||
Pairing and un-pairing of devices is planned, but not implemented at this time.
|
||||
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
- Python (2.7 or 3.2).
|
||||
- Python (2.7 or 3.2). Either version should work well.
|
||||
- Gtk 3; Gtk 2 should partially work with some problems.
|
||||
- Python GI (GObject Introspection), for Gtk bindings.
|
||||
- pyudev for enumerating udev devices.
|
||||
- Optional libnotify GI bindings, for desktop notifications.
|
||||
|
||||
The necessary packages for Debian/Ubuntu are `python-pyudev`/`python3-pyudev`,
|
||||
`python-gi`/`python3-gi`, `gir1.2-gtk-3.0`, and optionally `gir1.2-notify-0.7`.
|
||||
|
||||
|
||||
Installation
|
||||
------------
|
||||
@@ -33,13 +34,17 @@ In rules.d/ you'll find a udev rule file, to be copied in /etc/udev/rules.d/ (as
|
||||
root).
|
||||
|
||||
In its current form it makes the UR device available for r/w by all users
|
||||
belonging to the 'plugdev' system group (standard Debian group for pluggable
|
||||
devices). It may need changes, specific to your particular system's
|
||||
belonging to the 'plugdev' system group (standard Debian/Ubuntu group for
|
||||
pluggable devices). It may need changes, specific to your particular system's
|
||||
configuration.
|
||||
|
||||
If in doubt, replacing GROUP="plugdev" with GROUP="<your username>" should just
|
||||
work.
|
||||
|
||||
After you copy the file to /etc/udev/rules.d (and possibly modify it for your
|
||||
system), run 'udevadm control --reload-rules' as root for it to apply. Then
|
||||
physically remove the Unifying Receiver, wait 30 seconds and re-insert it.
|
||||
|
||||
|
||||
Thanks
|
||||
------
|
||||
|
||||
@@ -11,7 +11,7 @@ from logitech.unifying_receiver import api as _api
|
||||
from logitech.unifying_receiver.listener import EventsListener as _EventsListener
|
||||
from logitech.unifying_receiver.common import FallbackDict as _FallbackDict
|
||||
from logitech import devices as _devices
|
||||
from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS, NAMES)
|
||||
from logitech.devices.constants import (STATUS, STATUS_NAME, PROPS)
|
||||
|
||||
#
|
||||
#
|
||||
@@ -37,7 +37,7 @@ class _FeaturesArray(object):
|
||||
except _api._FeatureNotSupported:
|
||||
self.supported = False
|
||||
else:
|
||||
count = _base.request(handle, self.device.number, _pack('!BB', index, 0x00))
|
||||
count = None if index is None else _base.request(handle, self.device.number, _pack('!BB', index, 0x00))
|
||||
if count is None:
|
||||
self.supported = False
|
||||
else:
|
||||
@@ -105,20 +105,16 @@ class _FeaturesArray(object):
|
||||
class DeviceInfo(_api.PairedDevice):
|
||||
"""A device attached to the receiver.
|
||||
"""
|
||||
def __init__(self, listener, number, pair_code, status=STATUS.UNKNOWN):
|
||||
def __init__(self, listener, number, status=STATUS.UNKNOWN):
|
||||
super(DeviceInfo, self).__init__(listener.handle, number)
|
||||
self._features = _FeaturesArray(self)
|
||||
|
||||
self.LOG = _Logger("Device[%d]" % number)
|
||||
self._listener = listener
|
||||
self._pair_code = pair_code
|
||||
self._serial = None
|
||||
self._codename = None
|
||||
|
||||
self._status = status
|
||||
self.props = {}
|
||||
|
||||
self.features = _FeaturesArray(self)
|
||||
|
||||
# read them now, otherwise it it temporarily hang the UI
|
||||
# if status >= STATUS.CONNECTED:
|
||||
# n, k, s, f = self.name, self.kind, self.serial, self.firmware
|
||||
@@ -156,54 +152,6 @@ class DeviceInfo(_api.PairedDevice):
|
||||
t.append('Light: %d lux' % self.props[PROPS.LIGHT_LEVEL])
|
||||
return ', '.join(t) if t else STATUS_NAME[STATUS.CONNECTED]
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if self._name is None:
|
||||
if self._status < STATUS.CONNECTED:
|
||||
codename = self.codename
|
||||
if codename in NAMES:
|
||||
self._name, self._kind = NAMES[codename]
|
||||
else:
|
||||
self._name = _api.get_device_name(self.handle, self.number, self.features)
|
||||
return self._name or self.codename
|
||||
|
||||
@property
|
||||
def kind(self):
|
||||
if self._kind is None:
|
||||
if self._status < STATUS.CONNECTED:
|
||||
codename = self.codename
|
||||
if codename in NAMES:
|
||||
self._name, self._kind = NAMES[codename]
|
||||
else:
|
||||
self._kind = _api.get_device_kind(self.handle, self.number, self.features)
|
||||
return self._kind or '?'
|
||||
|
||||
@property
|
||||
def serial(self):
|
||||
if self._serial is None:
|
||||
# dodgy
|
||||
b = bytearray(self._pair_code)
|
||||
b[0] -= 0x10
|
||||
serial = _base.request(self.handle, 0xFF, b'\x83\xB5', bytes(b))
|
||||
if serial:
|
||||
self._serial = _base._hex(serial[1:5])
|
||||
return self._serial or '?'
|
||||
|
||||
@property
|
||||
def codename(self):
|
||||
if self._codename is None:
|
||||
codename = _base.request(self.handle, 0xFF, b'\x83\xB5', self._pair_code)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('ascii')
|
||||
return self._codename or '?'
|
||||
|
||||
@property
|
||||
def firmware(self):
|
||||
if self._firmware is None:
|
||||
if self._status >= STATUS.CONNECTED:
|
||||
self._firmware = _api.get_device_firmware(self.handle, self.number, self.features)
|
||||
return self._firmware or ()
|
||||
|
||||
def process_event(self, code, data):
|
||||
if code == 0x10 and data[:1] == b'\x8F':
|
||||
self.status = STATUS.UNAVAILABLE
|
||||
@@ -231,7 +179,7 @@ class DeviceInfo(_api.PairedDevice):
|
||||
return False
|
||||
|
||||
def __str__(self):
|
||||
return 'DeviceInfo(%d,%s,%d)' % (self.number, self._name or '?', self._status)
|
||||
return '<DeviceInfo(%d,%s,%d)>' % (self.number, self._name or '?', self._status)
|
||||
|
||||
#
|
||||
#
|
||||
@@ -292,12 +240,12 @@ class ReceiverListener(_EventsListener):
|
||||
self.status_changed_callback(self.receiver, device, urgent)
|
||||
|
||||
def _device_status_from(self, event):
|
||||
state_code = ord(event.data[2:3]) & 0xF0
|
||||
state = STATUS.UNAVAILABLE if state_code == 0x60 else \
|
||||
STATUS.CONNECTED if state_code == 0xA0 else \
|
||||
STATUS.CONNECTED if state_code == 0x20 else \
|
||||
STATUS.UNKNOWN
|
||||
if state == STATUS.UNKNOWN:
|
||||
state_code = ord(event.data[2:3]) & 0xC0
|
||||
state = STATUS.UNAVAILABLE if state_code == 0x40 else \
|
||||
STATUS.CONNECTED if state_code == 0x80 else \
|
||||
STATUS.CONNECTED if state_code == 0x00 else \
|
||||
None
|
||||
if state is None:
|
||||
self.LOG.warn("don't know how to handle state code 0x%02X: %s", state_code, event)
|
||||
return state
|
||||
|
||||
@@ -309,7 +257,7 @@ class ReceiverListener(_EventsListener):
|
||||
|
||||
if event.devnumber in self.receiver.devices:
|
||||
status = self._device_status_from(event)
|
||||
if status > STATUS.UNKNOWN:
|
||||
if status is not None:
|
||||
self.receiver.devices[event.devnumber].status = status
|
||||
else:
|
||||
dev = self.make_device(event)
|
||||
@@ -343,11 +291,13 @@ class ReceiverListener(_EventsListener):
|
||||
return None
|
||||
|
||||
status = self._device_status_from(event)
|
||||
if status is not None:
|
||||
dev = DeviceInfo(self, event.devnumber, status)
|
||||
self.LOG.info("new device %s", dev)
|
||||
self.status_changed(dev, True)
|
||||
return dev
|
||||
|
||||
dev = DeviceInfo(self, event.devnumber, event.data[4:5], status)
|
||||
self.LOG.info("new device %s", dev)
|
||||
self.status_changed(dev, True)
|
||||
return dev
|
||||
self.LOG.error("failed to identify status of device %d from %s", event.devnumber, event)
|
||||
|
||||
def unpair_device(self, device):
|
||||
try:
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
APPNAME = 'Solaar'
|
||||
NAME = 'Solaar'
|
||||
VERSION = '0.7.2'
|
||||
__author__ = "Daniel Pavel <daniel.pavel@gmail.com>"
|
||||
__version__ = '0.7'
|
||||
__version__ = VERSION
|
||||
__license__ = "GPL"
|
||||
|
||||
#
|
||||
@@ -11,7 +12,7 @@ __license__ = "GPL"
|
||||
|
||||
def _parse_arguments():
|
||||
import argparse
|
||||
arg_parser = argparse.ArgumentParser(prog=APPNAME.lower())
|
||||
arg_parser = argparse.ArgumentParser(prog=NAME.lower())
|
||||
arg_parser.add_argument('-v', '--verbose',
|
||||
action='count', default=0,
|
||||
help='increase the logger verbosity (may be repeated)')
|
||||
@@ -36,25 +37,41 @@ def _parse_arguments():
|
||||
return args
|
||||
|
||||
|
||||
def _check_requirements():
|
||||
try:
|
||||
import pyudev
|
||||
except ImportError:
|
||||
return 'python-pyudev'
|
||||
|
||||
try:
|
||||
import gi.repository
|
||||
except ImportError:
|
||||
return 'python-gi'
|
||||
|
||||
try:
|
||||
from gi.repository import Gtk
|
||||
except ImportError:
|
||||
return 'gir1.2-gtk-3.0'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = _parse_arguments()
|
||||
|
||||
req_fail = _check_requirements()
|
||||
if req_fail:
|
||||
raise ImportError('missing required package: %s' % req_fail)
|
||||
|
||||
import ui
|
||||
|
||||
# check if the notifications are available and enabled
|
||||
args.notifications &= args.systray
|
||||
if ui.notify.available and ui.notify.init(APPNAME):
|
||||
if ui.notify.available and ui.notify.init(NAME):
|
||||
ui.action.toggle_notifications.set_active(args.notifications)
|
||||
else:
|
||||
ui.action.toggle_notifications = None
|
||||
|
||||
from receiver import (ReceiverListener, DUMMY)
|
||||
|
||||
window = ui.main_window.create(APPNAME,
|
||||
DUMMY.name,
|
||||
DUMMY.max_devices,
|
||||
args.systray)
|
||||
|
||||
from receiver import DUMMY
|
||||
window = ui.main_window.create(NAME, DUMMY.name, DUMMY.max_devices, args.systray)
|
||||
if args.systray:
|
||||
menu_actions = (ui.action.toggle_notifications,
|
||||
ui.action.about)
|
||||
@@ -72,18 +89,23 @@ if __name__ == '__main__':
|
||||
def status_changed(receiver, device=None, urgent=False):
|
||||
ui.update(receiver, icon, window, device)
|
||||
if ui.notify.available and urgent:
|
||||
ui.notify.show(device or receiver)
|
||||
GObject.idle_add(ui.notify.show, device or receiver)
|
||||
|
||||
global listener
|
||||
if not listener:
|
||||
GObject.timeout_add(5000, check_for_listener)
|
||||
listener = None
|
||||
|
||||
from receiver import ReceiverListener
|
||||
def check_for_listener(retry=True):
|
||||
global listener, notify_missing
|
||||
|
||||
if listener is None:
|
||||
listener = ReceiverListener.open(status_changed)
|
||||
try:
|
||||
listener = ReceiverListener.open(status_changed)
|
||||
except OSError:
|
||||
ui.show_permissions_warning(window)
|
||||
|
||||
if listener is None:
|
||||
pairing.state = None
|
||||
if notify_missing:
|
||||
|
||||
@@ -1,18 +1,17 @@
|
||||
# pass
|
||||
|
||||
APPNAME = 'Solaar'
|
||||
APPVERSION = '0.7'
|
||||
|
||||
from . import (notify, status_icon, main_window, pair_window, action)
|
||||
|
||||
from gi.repository import (GObject, Gtk)
|
||||
GObject.threads_init()
|
||||
|
||||
|
||||
from solaar import NAME as _NAME
|
||||
_APP_ICONS = (_NAME + '-fail', _NAME + '-init', _NAME)
|
||||
def appicon(receiver_status):
|
||||
return (APPNAME + '-fail' if receiver_status < 0 else
|
||||
APPNAME + '-init' if receiver_status < 1 else
|
||||
APPNAME)
|
||||
return (_APP_ICONS[0] if receiver_status < 0 else
|
||||
_APP_ICONS[1] if receiver_status < 1 else
|
||||
_APP_ICONS[2])
|
||||
|
||||
|
||||
_ICON_THEME = Gtk.IconTheme.get_default()
|
||||
@@ -26,6 +25,16 @@ def icon_file(name):
|
||||
return None
|
||||
|
||||
|
||||
def show_permissions_warning(window):
|
||||
text = ('Found a possible Unifying Receiver device,\n'
|
||||
'but did not have permission to open it.')
|
||||
|
||||
m = Gtk.MessageDialog(window, Gtk.DialogFlags.MODAL, Gtk.MessageType.ERROR, Gtk.ButtonsType.CLOSE, text)
|
||||
m.set_title('Permissions error')
|
||||
m.run()
|
||||
m.destroy()
|
||||
|
||||
|
||||
def find_children(container, *child_names):
|
||||
assert container is not None
|
||||
|
||||
|
||||
@@ -2,9 +2,13 @@
|
||||
#
|
||||
#
|
||||
|
||||
# from sys import version as PYTTHON_VERSION
|
||||
from gi.repository import Gtk
|
||||
|
||||
import ui
|
||||
import ui.notify
|
||||
import ui.pair_window
|
||||
from solaar import NAME as _NAME
|
||||
from solaar import VERSION as _VERSION
|
||||
|
||||
|
||||
def _action(name, label, function, *args):
|
||||
@@ -27,7 +31,7 @@ def _toggle_action(name, label, function, *args):
|
||||
|
||||
def _toggle_notifications(action):
|
||||
if action.get_active():
|
||||
ui.notify.init(ui.APPNAME)
|
||||
ui.notify.init(_NAME)
|
||||
else:
|
||||
ui.notify.uninit()
|
||||
action.set_sensitive(ui.notify.available)
|
||||
@@ -36,16 +40,18 @@ toggle_notifications = _toggle_action('notifications', 'Notifications', _toggle_
|
||||
|
||||
def _show_about_window(action):
|
||||
about = Gtk.AboutDialog()
|
||||
about.set_icon_name(ui.APPNAME)
|
||||
about.set_program_name(ui.APPNAME)
|
||||
about.set_logo_icon_name(ui.APPNAME)
|
||||
about.set_version(ui.APPVERSION)
|
||||
about.set_icon_name(_NAME)
|
||||
about.set_program_name(_NAME)
|
||||
about.set_logo_icon_name(_NAME)
|
||||
about.set_version(_VERSION)
|
||||
about.set_license_type(Gtk.License.GPL_2_0)
|
||||
about.set_authors(('Daniel Pavel http://github.com/pwr', ))
|
||||
about.set_website('http://github.com/pwr/Solaar/wiki')
|
||||
about.set_website_label('Solaar Wiki')
|
||||
# about.set_comments('Using Python %s\n' % PYTTHON_VERSION.split(' ')[0])
|
||||
about.run()
|
||||
about.destroy()
|
||||
about = _action('help-about', 'About ' + ui.APPNAME, _show_about_window)
|
||||
about = _action('help-about', 'About ' + _NAME, _show_about_window)
|
||||
|
||||
quit = _action('exit', 'Quit', Gtk.main_quit)
|
||||
|
||||
|
||||
@@ -22,8 +22,9 @@ def _info_text(dev):
|
||||
(f.kind, f.name, ' ' if f.name else '', f.version) for f in dev.firmware])
|
||||
return ('<small>'
|
||||
'Serial \t\t<tt>%s</tt>\n'
|
||||
'HID protocol\t<tt>%1.1f</tt>\n'
|
||||
'%s'
|
||||
'</small>' % (dev.serial, fw_text))
|
||||
'</small>' % (dev.serial, dev.protocol, fw_text))
|
||||
|
||||
def _toggle_info(action, label_widget, box_widget, frame):
|
||||
if action.get_active():
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
#!/bin/sh
|
||||
|
||||
LIB=`dirname "$0"`/../lib
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
Z=`readlink -f "$0"`
|
||||
LIB=`readlink -f $(dirname "$Z")/../lib`
|
||||
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
export PYTHONPATH=$LIB
|
||||
|
||||
exec python -OOu -m hidapi.hidconsole "$@"
|
||||
PYTHON=`which python python2 python3 | head -n 1`
|
||||
exec $PYTHON -OOu -m hidapi.hidconsole "$@"
|
||||
|
||||
8
bin/scan
8
bin/scan
@@ -1,7 +1,9 @@
|
||||
#!/bin/sh
|
||||
|
||||
LIB=`dirname "$0"`/../lib
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
Z=`readlink -f "$0"`
|
||||
LIB=`readlink -f $(dirname "$Z")/../lib`
|
||||
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
export PYTHONPATH=$LIB
|
||||
|
||||
exec python -OOu -m logitech.scanner "$@"
|
||||
PYTHON=`which python python2 python3 | head -n 1`
|
||||
exec $PYTHON -OOu -m logitech.scanner "$@"
|
||||
|
||||
@@ -5,9 +5,9 @@ APP=`readlink -f $(dirname "$Z")/../app`
|
||||
LIB=`readlink -f $(dirname "$Z")/../lib`
|
||||
SHARE=`readlink -f $(dirname "$Z")/../share`
|
||||
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
#export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB/native/`arch`
|
||||
export PYTHONPATH=$APP:$LIB
|
||||
export XDG_DATA_DIRS=$SHARE:$XDG_DATA_DIRS
|
||||
|
||||
exec python -OOu -m solaar "$@"
|
||||
#exec python -OOu -m profile -o $TMPDIR/profile.log app/solaar.py "$@"
|
||||
PYTHON=`which python python2 python3 | head -n 1`
|
||||
exec $PYTHON -OOu -m solaar "$@"
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import sys
|
||||
from select import select as _select
|
||||
import time
|
||||
from binascii import hexlify, unhexlify
|
||||
_hex = lambda d: hexlify(d).decode('ascii').upper()
|
||||
|
||||
|
||||
interactive = os.isatty(0)
|
||||
start_time = 0
|
||||
try:
|
||||
read_packet = raw_input
|
||||
@@ -14,19 +17,18 @@ except:
|
||||
|
||||
|
||||
def _print(marker, data, scroll=False):
|
||||
hexs = _hex(data)
|
||||
|
||||
t = time.time() - start_time
|
||||
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
|
||||
|
||||
if scroll:
|
||||
if interactive and scroll:
|
||||
sys.stdout.write('\033[s')
|
||||
sys.stdout.write('\033[S') # scroll up
|
||||
sys.stdout.write('\033[A\033[L\033[G') # insert new line above the current one, position on first column
|
||||
|
||||
hexs = _hex(data)
|
||||
s = '%s (% 8.3f) [%s %s %s %s] %s' % (marker, t, hexs[0:2], hexs[2:4], hexs[4:8], hexs[8:], repr(data))
|
||||
sys.stdout.write(s)
|
||||
|
||||
if scroll:
|
||||
if interactive and scroll:
|
||||
sys.stdout.write('\033[u')
|
||||
else:
|
||||
sys.stdout.write('\n')
|
||||
@@ -57,17 +59,18 @@ if __name__ == '__main__':
|
||||
repr(hidapi.get_manufacturer(handle)),
|
||||
repr(hidapi.get_product(handle)),
|
||||
repr(hidapi.get_serial(handle))))
|
||||
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
|
||||
if interactive:
|
||||
print (".. Press ^C/^D to exit, or type hex bytes to write to the device.")
|
||||
|
||||
import readline
|
||||
if args.history is None:
|
||||
import os.path
|
||||
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
|
||||
try:
|
||||
readline.read_history_file(args.history)
|
||||
except:
|
||||
# file may not exist yet
|
||||
pass
|
||||
import readline
|
||||
if args.history is None:
|
||||
import os.path
|
||||
args.history = os.path.join(os.path.expanduser("~"), ".hidconsole-history")
|
||||
try:
|
||||
readline.read_history_file(args.history)
|
||||
except:
|
||||
# file may not exist yet
|
||||
pass
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
@@ -77,8 +80,10 @@ if __name__ == '__main__':
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
prompt = '?? Input: ' if interactive else ''
|
||||
|
||||
while t.is_alive():
|
||||
line = read_packet('?? Input: ').strip().replace(' ', '')
|
||||
line = read_packet(prompt).strip().replace(' ', '')
|
||||
if line:
|
||||
try:
|
||||
data = unhexlify(line.encode('ascii'))
|
||||
@@ -87,11 +92,18 @@ if __name__ == '__main__':
|
||||
else:
|
||||
_print('<<', data)
|
||||
hidapi.write(handle, data)
|
||||
except Exception as e:
|
||||
# wait for some kind of reply
|
||||
if not interactive:
|
||||
rlist, wlist, xlist = _select([handle], [], [], 1)
|
||||
time.sleep(0.1)
|
||||
except EOFError:
|
||||
pass
|
||||
except Exception as e:
|
||||
print ('%s: %s' % (type(e).__name__, e))
|
||||
|
||||
print (".. Closing handle %X" % handle)
|
||||
hidapi.close(handle)
|
||||
readline.write_history_file(args.history)
|
||||
if interactive:
|
||||
readline.write_history_file(args.history)
|
||||
else:
|
||||
print ("!! Failed to open %s, aborting" % args.device)
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
#
|
||||
# Partial Python implementation of the native hidapi.
|
||||
# Requires pyudev
|
||||
#
|
||||
"""Generic Human Interface Device API.
|
||||
|
||||
It is currently a partial pure-Python implementation of the native HID API
|
||||
implemented by signal11 (https://github.com/signal11/hidapi), and requires
|
||||
``pyudev``.
|
||||
The docstrings are mostly copied from the hidapi API header, with changes where
|
||||
necessary.
|
||||
"""
|
||||
|
||||
import os as _os
|
||||
from select import select as _select
|
||||
@@ -120,10 +124,7 @@ def open_path(device_path):
|
||||
|
||||
:returns: an opaque device handle, or ``None``.
|
||||
"""
|
||||
try:
|
||||
return _os.open(device_path, _os.O_RDWR | _os.O_SYNC)
|
||||
except:
|
||||
pass
|
||||
return _os.open(device_path, _os.O_RDWR | _os.O_SYNC)
|
||||
|
||||
|
||||
def close(device_handle):
|
||||
|
||||
@@ -43,4 +43,5 @@ NAMES = {
|
||||
'K750': ('Wireless Solar Keyboard K750', 'keyboard'),
|
||||
'K800': ('Wireless Illuminated Keyboard K800', 'keyboard'),
|
||||
'T650': ('Wireless Rechargeable Touchpad T650', 'touchpad'),
|
||||
'Performance MX': ('Performance Mouse MX', 'mouse'),
|
||||
}
|
||||
|
||||
@@ -8,18 +8,26 @@ def print_receiver(receiver):
|
||||
for f in receiver.firmware:
|
||||
print (" %-10s: %s" % (f.kind, f.version))
|
||||
|
||||
print ("--------")
|
||||
|
||||
|
||||
def scan_devices(receiver):
|
||||
for dev in receiver:
|
||||
print ("--------")
|
||||
print (str(dev))
|
||||
print ("Name: %s" % dev.name)
|
||||
print ("Kind: %s" % dev.kind)
|
||||
print ("Name : %s" % dev.name)
|
||||
print ("Kind : %s" % dev.kind)
|
||||
print ("Serial number: %s" % dev.serial)
|
||||
if not dev.protocol:
|
||||
print ("HID protocol : UNKNOWN")
|
||||
continue
|
||||
|
||||
print ("HID protocol : HID %01.1f" % dev.protocol)
|
||||
if dev.protocol < 2.0:
|
||||
print ("Features query not supported by this device")
|
||||
continue
|
||||
|
||||
firmware = dev.firmware
|
||||
for fw in firmware:
|
||||
print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version))
|
||||
print (" %-10s: %s %s" % (fw.kind, fw.name, fw.version))
|
||||
|
||||
all_features = api.get_device_features(dev.handle, dev.number)
|
||||
for index in range(0, len(all_features)):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
from struct import pack as _pack
|
||||
from struct import unpack as _unpack
|
||||
import errno as _errno
|
||||
|
||||
|
||||
from . import base as _base
|
||||
@@ -31,32 +32,74 @@ class PairedDevice(object):
|
||||
self.handle = handle
|
||||
self.number = number
|
||||
|
||||
self._protocol = None
|
||||
self._features = None
|
||||
self._codename = None
|
||||
self._name = None
|
||||
self._kind = None
|
||||
self._serial = None
|
||||
self._firmware = None
|
||||
self.features = [FEATURE.ROOT]
|
||||
|
||||
@property
|
||||
def protocol(self):
|
||||
if self._protocol is None:
|
||||
self._protocol = _base.ping(self.handle, self.number)
|
||||
return 0 if self._protocol is None else self._protocol
|
||||
|
||||
@property
|
||||
def features(self):
|
||||
if self._features is None:
|
||||
if self.protocol >= 2.0:
|
||||
self._features = [FEATURE.ROOT]
|
||||
return self._features
|
||||
|
||||
@property
|
||||
def codename(self):
|
||||
if self._codename is None:
|
||||
codename = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x40 + self.number - 1)
|
||||
if codename:
|
||||
self._codename = codename[2:].rstrip(b'\x00').decode('ascii')
|
||||
return self._codename or '?'
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if self._name is None:
|
||||
self._name = get_device_name(self.handle, self.number, self.features)
|
||||
return self._name or '?'
|
||||
if self.protocol < 2.0:
|
||||
from ..devices.constants import NAMES as _DEVICE_NAMES
|
||||
if self.codename in _DEVICE_NAMES:
|
||||
self._name, self._kind = _DEVICE_NAMES[self._codename]
|
||||
else:
|
||||
self._name = get_device_name(self.handle, self.number, self.features)
|
||||
return self._name or self.codename
|
||||
|
||||
@property
|
||||
def kind(self):
|
||||
if self._kind is None:
|
||||
self._kind = get_device_kind(self.handle, self.number, self.features)
|
||||
if self.protocol < 2.0:
|
||||
from ..devices.constants import NAMES as _DEVICE_NAMES
|
||||
if self.codename in _DEVICE_NAMES:
|
||||
self._name, self._kind = _DEVICE_NAMES[self._codename]
|
||||
else:
|
||||
self._kind = get_device_kind(self.handle, self.number, self.features)
|
||||
return self._kind or '?'
|
||||
|
||||
@property
|
||||
def firmware(self):
|
||||
if self._firmware is None:
|
||||
if self._firmware is None and self.protocol >= 2.0:
|
||||
self._firmware = get_device_firmware(self.handle, self.number, self.features)
|
||||
return self._firmware or ()
|
||||
|
||||
@property
|
||||
def serial(self):
|
||||
if self._serial is None:
|
||||
prefix = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x20 + self.number - 1)
|
||||
serial = _base.request(self.handle, 0xFF, b'\x83\xB5', 0x30 + self.number - 1)
|
||||
if prefix and serial:
|
||||
self._serial = _base._hex(prefix[3:5]) + '-' + _base._hex(serial[1:5])
|
||||
return self._serial or '?'
|
||||
|
||||
def ping(self):
|
||||
reply = _base.request(self.handle, self.number, b'\x00\x10', b'\x00\x00\xAA')
|
||||
return reply is not None and reply[2:3] == b'\xAA'
|
||||
return _base.ping(self.handle, self.number) is not None
|
||||
|
||||
def __str__(self):
|
||||
return '<PairedDevice(%X,%d,%s)>' % (self.handle, self.number, self._name or '?')
|
||||
@@ -113,7 +156,7 @@ class Receiver(object):
|
||||
if self.handle == 0:
|
||||
return
|
||||
|
||||
for number in range(1, MAX_ATTACHED_DEVICES):
|
||||
for number in range(1, 1 + MAX_ATTACHED_DEVICES):
|
||||
dev = get_device(self.handle, number)
|
||||
if dev is not None:
|
||||
yield dev
|
||||
@@ -147,8 +190,8 @@ class Receiver(object):
|
||||
if self.handle == 0:
|
||||
return False
|
||||
if type(dev) == int:
|
||||
return (dev < 1 or dev > MAX_ATTACHED_DEVICES) and ping(self.handle, dev)
|
||||
return ping(self.handle, dev.number)
|
||||
return dev > 0 and dev <= MAX_ATTACHED_DEVICES and _base.ping(self.handle, dev) is not None
|
||||
return dev.ping()
|
||||
|
||||
def __str__(self):
|
||||
return '<Receiver(%X,%s)>' % (self.handle, self.path)
|
||||
@@ -164,10 +207,22 @@ class Receiver(object):
|
||||
|
||||
:returns: An open file handle for the found receiver, or ``None``.
|
||||
"""
|
||||
exception = None
|
||||
|
||||
for rawdevice in _base.list_receiver_devices():
|
||||
handle = _base.try_open(rawdevice.path)
|
||||
if handle:
|
||||
return Receiver(handle, rawdevice.path)
|
||||
exception = None
|
||||
try:
|
||||
handle = _base.try_open(rawdevice.path)
|
||||
if handle:
|
||||
return Receiver(handle, rawdevice.path)
|
||||
except OSError as e:
|
||||
_log.exception("open %s", rawdevice.path)
|
||||
if e.errno == _errno.EACCES:
|
||||
exception = e
|
||||
|
||||
if exception:
|
||||
# only keep the last exception
|
||||
raise exception
|
||||
|
||||
#
|
||||
#
|
||||
@@ -215,20 +270,12 @@ def request(handle, devnumber, feature, function=b'\x00', params=b'', features=N
|
||||
return _base.request(handle, devnumber, feature_index + function, params)
|
||||
|
||||
|
||||
def ping(handle, devnumber):
|
||||
"""
|
||||
:returns: True if the device is connected to the UR.
|
||||
"""
|
||||
reply = _base.request(handle, devnumber, b'\x00\x10', b'\x00\x00\xAA')
|
||||
return reply is not None and reply[2:3] == b'\xAA'
|
||||
|
||||
|
||||
def get_device(handle, devnumber, features=None):
|
||||
"""Gets the complete info for a device (type, features).
|
||||
|
||||
:returns: a PairedDevice or ``None``.
|
||||
"""
|
||||
if ping(handle, devnumber):
|
||||
if _base.ping(handle, devnumber):
|
||||
devinfo = PairedDevice(handle, devnumber)
|
||||
# _log.debug("found device %s", devinfo)
|
||||
return devinfo
|
||||
@@ -278,6 +325,7 @@ def _get_feature_index(handle, devnumber, feature, features=None):
|
||||
if len(features) <= index:
|
||||
features += [None] * (index + 1 - len(features))
|
||||
features[index] = feature
|
||||
# _log.debug("%s: found feature %s at %d", features, _base._hex(feature), index)
|
||||
return index
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#
|
||||
|
||||
from struct import pack as _pack
|
||||
from struct import unpack as _unpack
|
||||
from binascii import hexlify as _hexlify
|
||||
_hex = lambda d: _hexlify(d).decode('ascii').upper()
|
||||
|
||||
@@ -268,12 +269,12 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None
|
||||
|
||||
if reply_code == 0x10 and reply_data[:1] == b'\x8F' and reply_data[1:3] == feature_index_function:
|
||||
# device not present
|
||||
_log.debug("device %d request ping failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data))
|
||||
_log.debug("device %d request failed on {%s} call: [%s]", devnumber, _hex(feature_index_function), _hex(reply_data))
|
||||
return None
|
||||
|
||||
if reply_code == 0x10 and reply_data[:1] == b'\x8F':
|
||||
# device not present
|
||||
_log.debug("request ping failed: [%s]", devnumber, _hex(reply_data))
|
||||
_log.debug("device %d request failed: [%s]", devnumber, _hex(reply_data))
|
||||
return None
|
||||
|
||||
if reply_code == 0x11 and reply_data[0] == b'\xFF' and reply_data[1:3] == feature_index_function:
|
||||
@@ -298,3 +299,50 @@ def request(handle, devnumber, feature_index_function, params=b'', features=None
|
||||
# _log.debug("device %d unmatched reply {%s} (expected {%s})", devnumber, _hex(reply_data[:2]), _hex(feature_index_function))
|
||||
if _unhandled:
|
||||
_unhandled(reply_code, reply_devnumber, reply_data)
|
||||
|
||||
|
||||
def ping(handle, devnumber):
|
||||
"""Check if a device is connected to the UR.
|
||||
|
||||
:returns: The HID protocol supported by the device, as a floating point number, if the device is active.
|
||||
"""
|
||||
if request_context is None or handle != request_context.handle:
|
||||
context = _DEFAULT_REQUEST_CONTEXT
|
||||
_unhandled = unhandled_hook
|
||||
else:
|
||||
context = request_context
|
||||
_unhandled = getattr(context, 'unhandled_hook')
|
||||
|
||||
context.write(handle, devnumber, b'\x00\x10\x00\x00\xAA')
|
||||
read_times = _MAX_READ_TIMES
|
||||
while read_times > 0:
|
||||
divisor = (1 + _MAX_READ_TIMES - read_times)
|
||||
reply = context.read(handle, int(DEFAULT_TIMEOUT * (divisor + 1) / 2 / divisor))
|
||||
read_times -= 1
|
||||
|
||||
if not reply:
|
||||
# keep waiting...
|
||||
continue
|
||||
|
||||
reply_code, reply_devnumber, reply_data = reply
|
||||
|
||||
if reply_devnumber != devnumber:
|
||||
# this message not for the device we're interested in
|
||||
# _l.log(_LOG_LEVEL, "device %d request got reply for unexpected device %d: [%s]", devnumber, reply_devnumber, _hex(reply_data))
|
||||
# worst case scenario, this is a reply for a concurrent request
|
||||
# on this receiver
|
||||
if _unhandled:
|
||||
_unhandled(reply_code, reply_devnumber, reply_data)
|
||||
continue
|
||||
|
||||
if reply_code == 0x11 and reply_data[:2] == b'\x00\x10' and reply_data[4:5] == b'\xAA':
|
||||
major, minor = _unpack('!BB', reply_data[2:4])
|
||||
return major + minor / 10.0
|
||||
|
||||
if reply_code == 0x10 and reply_data == b'\x8F\x00\x10\x01\x00':
|
||||
return 1.0
|
||||
|
||||
if reply_code == 0x10 and reply_data[:3] == b'\x8F\x00\x10':
|
||||
return None
|
||||
|
||||
_log.warn("don't know how to interpret ping reply %s", reply)
|
||||
|
||||
@@ -83,14 +83,10 @@ class EventsListener(_Thread):
|
||||
matched = False
|
||||
task = None if self._tasks.empty() else self._tasks.queue[0]
|
||||
if task and task[-1] is None:
|
||||
devnumber, data = task[:2]
|
||||
if event[1] == devnumber:
|
||||
# _log.debug("matching %s to %d, %s", event, devnumber, repr(data))
|
||||
if event[0] == 0x11 or (event[0] == 0x10 and devnumber == 0xFF):
|
||||
matched = (event[2][:2] == data[:2]) or (event[2][:1] == b'\xFF' and event[2][1:3] == data[:2])
|
||||
elif event[0] == 0x10:
|
||||
if event[2][:1] == b'\x8F' and event[2][1:3] == data[:2]:
|
||||
matched = True
|
||||
task_dev, task_data = task[:2]
|
||||
if event[1] == task_dev:
|
||||
# _log.debug("matching %s to (%d, %s)", event, task_dev, repr(task_data))
|
||||
matched = event[2][:2] == task_data[:2] or (event[2][:1] in b'\x8F\xFF' and event[2][1:3] == task_data[:2])
|
||||
|
||||
if matched:
|
||||
# _log.debug("request reply %s", event)
|
||||
@@ -140,6 +136,6 @@ class EventsListener(_Thread):
|
||||
_log.info("queueing unhandled event %s", event)
|
||||
self._events.put(event)
|
||||
|
||||
def __nonzero__(self):
|
||||
def __bool__(self):
|
||||
return bool(self._active and self._handle)
|
||||
__bool__ = __nonzero__
|
||||
__nonzero__ = __bool__
|
||||
|
||||
@@ -6,19 +6,6 @@
|
||||
# Make sure the plugdev group exists on your system and your user is a member
|
||||
# before applying these rules.
|
||||
|
||||
|
||||
# If you are using the libusb implementation of hidapi (hid-libusb.c), then
|
||||
# use something like the following line, substituting the VID and PID with
|
||||
# those of your device. Note that for kernels before 2.6.24, you will need
|
||||
# to substitute "usb" with "usb_device". It shouldn't hurt to use two lines
|
||||
# (one each way) for compatibility with older systems.
|
||||
|
||||
# HIDAPI/libusb
|
||||
ACTION=="add", SUBSYSTEM=="usb", ATTR{idVendor}=="046d", ATTR{idProduct}=="c52b", GROUP="plugdev", MODE="0660"
|
||||
|
||||
# If you are using the hidraw implementation, then do something like the
|
||||
# following, substituting the VID and PID with your device.
|
||||
|
||||
# HIDAPI/hidraw
|
||||
ACTION=="add", KERNEL=="hidraw*", ATTRS{idVendor}=="046d", ATTRS{idProduct}=="c52b", GROUP="plugdev", MODE="0660"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user