diff --git a/bin/solaar b/bin/solaar index 99ee81bc..17cecc26 100755 --- a/bin/solaar +++ b/bin/solaar @@ -25,6 +25,7 @@ def init_paths(): import sys # Python 3 might have problems converting back to UTF-8 in case of Unicode surrogates + decoded_path = None try: decoded_path = sys.path[0] sys.path[0].encode(sys.getfilesystemencoding()) diff --git a/lib/hid_parser/__init__.py b/lib/hid_parser/__init__.py index d09e5eaa..459b809e 100644 --- a/lib/hid_parser/__init__.py +++ b/lib/hid_parser/__init__.py @@ -560,7 +560,8 @@ class ArrayItem(MainItem): ) continue - if usage in self._usages and all(usage_type not in self._INCOMPATIBLE_TYPES for usage_type in usage.usage_types): + not_incompatible_type = all(usage_type not in self._INCOMPATIBLE_TYPES for usage_type in usage.usage_types) + if usage in self._usages and not_incompatible_type: usage_values[usage] = UsageValue(self, True) return usage_values @@ -820,14 +821,28 @@ class ReportDescriptor: if data is None: raise InvalidReportDescriptor("Invalid output item") self._append_items( - offset_output, self._output, report_id, report_count, report_size, usages, data, {**glob, **local} + offset_output, + self._output, + report_id, + report_count, + report_size, + usages, + data, + {**glob, **local}, ) elif tag == TagMain.FEATURE: if data is None: raise InvalidReportDescriptor("Invalid feature item") self._append_items( - offset_feature, self._feature, report_id, report_count, report_size, usages, data, {**glob, **local} + offset_feature, + self._feature, + report_id, + report_count, + report_size, + usages, + data, + {**glob, **local}, ) # clear local diff --git a/lib/hidapi/common.py b/lib/hidapi/common.py index 5f20e32e..8a5d1b6c 100644 --- a/lib/hidapi/common.py +++ b/lib/hidapi/common.py @@ -1,18 +1,20 @@ +from __future__ import annotations + import dataclasses @dataclasses.dataclass class DeviceInfo: path: str - bus_id: str + bus_id: str | None vendor_id: str product_id: str - interface: str - driver: str - manufacturer: str - product: str - serial: str - release: str + interface: str | None + driver: str | None + manufacturer: str | None + product: str | None + serial: str | None + release: str | None isDevice: bool - hidpp_short: str - hidpp_long: str + hidpp_short: str | None + hidpp_long: str | None diff --git a/lib/hidapi/hidapi_impl.py b/lib/hidapi/hidapi_impl.py index 6764d519..74e7bb33 100644 --- a/lib/hidapi/hidapi_impl.py +++ b/lib/hidapi/hidapi_impl.py @@ -33,6 +33,7 @@ import typing from threading import Thread from time import sleep +from typing import Callable from hidapi.common import DeviceInfo @@ -203,6 +204,7 @@ class _DeviceMonitor(Thread): def __init__(self, device_callback, polling_delay=5.0): self.device_callback = device_callback self.polling_delay = polling_delay + self.prev_devices = None # daemon threads are automatically killed when main thread exits super().__init__(daemon=True) @@ -259,7 +261,12 @@ def _match(action, device, filterfn): if logger.isEnabledFor(logging.INFO): logger.info( - "Found device BID %s VID %04X PID %04X HID++ %s %s", bus_id, vid, pid, device["hidpp_short"], device["hidpp_long"] + "Found device BID %s VID %04X PID %04X HID++ %s %s", + bus_id, + vid, + pid, + device["hidpp_short"], + device["hidpp_long"], ) if not device["hidpp_short"] and not device["hidpp_long"]: @@ -317,7 +324,7 @@ def find_paired_node_wpid(receiver_path: str, index: int): return None -def monitor_glib(glib: GLib, callback, filterfn): +def monitor_glib(glib: GLib, callback: Callable, filterfn: Callable): """Monitor GLib. Parameters @@ -452,7 +459,6 @@ def read(device_handle, bytes_count, timeout_ms=None): if bytes_read < 0: raise HIDError(_hidapi.hid_error(device_handle)) - return None return data.raw[:bytes_read] diff --git a/lib/hidapi/udev_impl.py b/lib/hidapi/udev_impl.py index c89b1f52..8f3d412c 100644 --- a/lib/hidapi/udev_impl.py +++ b/lib/hidapi/udev_impl.py @@ -36,6 +36,7 @@ import warnings from select import select from time import sleep from time import time +from typing import Callable import pyudev @@ -114,7 +115,12 @@ def _match(action, device, filter_func: typing.Callable[[int, int, int, bool, bo hidpp_short = None hidpp_long = None logger.info( - "Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", device.device_node, bid, vid, pid, e + "Report Descriptor not processed for DEVICE %s BID %s VID %s PID %s: %s", + device.device_node, + bid, + vid, + pid, + e, ) filtered_result = filter_func(int(bid, 16), int(vid, 16), int(pid, 16), hidpp_short, hidpp_long) @@ -222,7 +228,7 @@ def find_paired_node_wpid(receiver_path, index): return None -def monitor_glib(glib: GLib, callback, filterfn): +def monitor_glib(glib: GLib, callback: Callable, filterfn: typing.Callable): """Monitor GLib. Parameters @@ -453,7 +459,7 @@ def get_indexed_string(device_handle, index): assert device_handle stat = os.fstat(device_handle) try: - dev = pyudev.Device.from_device_number(pyudev.Context(), "char", stat.st_rdev) + dev = pyudev.Devices.from_device_number(pyudev.Context(), "char", stat.st_rdev) except (pyudev.DeviceNotFoundError, ValueError): return None diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index da3ac43b..326a3b5a 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -29,6 +29,7 @@ from contextlib import contextmanager from random import getrandbits from time import time from typing import Any +from typing import Callable import gi @@ -53,7 +54,7 @@ else: logger = logging.getLogger(__name__) -_SHORT_MESSAGE_SIZE = 7 +SHORT_MESSAGE_SIZE = 7 _LONG_MESSAGE_SIZE = 20 _MEDIUM_MESSAGE_SIZE = 15 _MAX_READ_SIZE = 32 @@ -138,7 +139,7 @@ def _match(record: dict[str, Any], bus_id: int, vendor_id: int, product_id: int) def filter_receivers( - bus_id: int, vendor_id: int, product_id: int, hidpp_short: bool = False, hidpp_long: bool = False + bus_id: int, vendor_id: int, product_id: int, _hidpp_short: bool = False, _hidpp_long: bool = False ) -> dict[str, Any]: """Check that this product is a Logitech receiver. @@ -184,7 +185,7 @@ def receivers_and_devices(): yield from hidapi.enumerate(filter_products_of_interest) -def notify_on_receivers_glib(glib: GLib, callback): +def notify_on_receivers_glib(glib: GLib, callback: Callable): """Watch for matching devices and notifies the callback on the GLib thread. Parameters @@ -254,7 +255,7 @@ def write(handle, devnumber, data, long_message=False): assert data is not None assert isinstance(data, bytes), (repr(data), type(data)) - if long_message or len(data) > _SHORT_MESSAGE_SIZE - 2 or data[:1] == b"\x82": + if long_message or len(data) > SHORT_MESSAGE_SIZE - 2 or data[:1] == b"\x82": wdata = struct.pack("!BB18s", HIDPP_LONG_MESSAGE_ID, devnumber, data) else: wdata = struct.pack("!BB5s", HIDPP_SHORT_MESSAGE_ID, devnumber, data) @@ -303,7 +304,7 @@ def is_relevant_message(data: bytes) -> bool: # mapping from report_id to message length report_lengths = { - HIDPP_SHORT_MESSAGE_ID: _SHORT_MESSAGE_SIZE, + HIDPP_SHORT_MESSAGE_ID: SHORT_MESSAGE_SIZE, HIDPP_LONG_MESSAGE_ID: _LONG_MESSAGE_SIZE, DJ_MESSAGE_ID: _MEDIUM_MESSAGE_SIZE, 0x21: _MAX_READ_SIZE, @@ -344,7 +345,12 @@ def _read(handle, timeout): report_id != DJ_MESSAGE_ID or ord(data[2:3]) > 0x10 ): # ignore DJ input messages logger.debug( - "(%s) => r[%02X %02X %s %s]", handle, report_id, devnumber, common.strhex(data[2:4]), common.strhex(data[4:]) + "(%s) => r[%02X %02X %s %s]", + handle, + report_id, + devnumber, + common.strhex(data[2:4]), + common.strhex(data[4:]), ) return report_id, devnumber, data[2:] @@ -554,7 +560,12 @@ def request( error, hidpp20_constants.ERROR[error], ) - raise exceptions.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) + raise exceptions.FeatureCallError( + number=devnumber, + request=request_id, + error=error, + params=params, + ) if reply_data[:2] == request_data[:2]: if devnumber == 0xFF: @@ -628,7 +639,7 @@ def ping(handle, devnumber, long_message: bool = False): and reply_data[1:3] == request_data[:2] ): # error response error = ord(reply_data[3:4]) - if error == hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device + if error == hidpp10_constants.ERROR.invalid_SubID__command: # valid reply from HID++ 1.0 device return 1.0 if ( error == hidpp10_constants.ERROR.resource_error diff --git a/lib/logitech_receiver/common.py b/lib/logitech_receiver/common.py index 7905f497..14dfb2e5 100644 --- a/lib/logitech_receiver/common.py +++ b/lib/logitech_receiver/common.py @@ -370,12 +370,12 @@ class NamedInts: __slots__ = ("__dict__", "_values", "_indexed", "_fallback", "_is_sorted") - def __init__(self, dict=None, **kwargs): + def __init__(self, dict_=None, **kwargs): def _readable_name(n): return n.replace("__", "/").replace("_", " ") # print (repr(kwargs)) - elements = dict if dict else kwargs + elements = dict_ if dict_ else kwargs values = {k: NamedInt(v, _readable_name(k)) for (k, v) in elements.items()} self.__dict__ = values self._is_sorted = False @@ -499,7 +499,7 @@ class NamedInts: return NamedInts(**self.__dict__, **other.__dict__) def __eq__(self, other): - return type(self) == type(other) and self._values == other._values + return isinstance(other, self.__class__) and self._values == other._values class UnsortedNamedInts(NamedInts): @@ -548,7 +548,7 @@ class FirmwareInfo: kind: str name: str version: str - extras: str + extras: str | None class BatteryStatus(IntEnum): diff --git a/lib/logitech_receiver/descriptors.py b/lib/logitech_receiver/descriptors.py index de4a49c5..03c06e74 100644 --- a/lib/logitech_receiver/descriptors.py +++ b/lib/logitech_receiver/descriptors.py @@ -225,7 +225,13 @@ _D("Craft Advanced Keyboard", codename="Craft", protocol=4.5, wpid="4066", btid= _D("Wireless Illuminated Keyboard K800 new", codename="K800 new", protocol=4.5, wpid="406E") _D("Wireless Keyboard K470", codename="K470", protocol=4.5, wpid="4075") _D("MX Keys Keyboard", codename="MX Keys", protocol=4.5, wpid="408A", btid=0xB35B) -_D("G915 TKL LIGHTSPEED Wireless RGB Mechanical Gaming Keyboard", codename="G915 TKL", protocol=4.2, wpid="408E", usbid=0xC343) +_D( + "G915 TKL LIGHTSPEED Wireless RGB Mechanical Gaming Keyboard", + codename="G915 TKL", + protocol=4.2, + wpid="408E", + usbid=0xC343, +) _D("Illuminated Keyboard", codename="Illuminated", protocol=1.0, usbid=0xC318, interface=1) _D("G213 Prodigy Gaming Keyboard", codename="G213", usbid=0xC336, interface=1) _D("G512 RGB Mechanical Gaming Keyboard", codename="G512", usbid=0xC33C, interface=1) @@ -253,7 +259,14 @@ _D( wpid=("1006", "100D", "0612"), registers=(Reg.BATTERY_CHARGE,), ) -_D("MX Air", codename="MX Air", protocol=1.0, kind=DEVICE_KIND.mouse, wpid=("1007", "100E"), registers=(Reg.BATTERY_CHARGE)) +_D( + "MX Air", + codename="MX Air", + protocol=1.0, + kind=DEVICE_KIND.mouse, + wpid=("1007", "100E"), + registers=Reg.BATTERY_CHARGE, +) _D( "MX Revolution", codename="MX Revolution", @@ -262,10 +275,34 @@ _D( wpid=("1008", "100C"), registers=(Reg.BATTERY_CHARGE,), ) -_D("MX620 Laser Cordless Mouse", codename="MX620", protocol=1.0, wpid=("100A", "1016"), registers=(Reg.BATTERY_CHARGE,)) -_D("VX Nano Cordless Laser Mouse", codename="VX Nano", protocol=1.0, wpid=("100B", "100F"), registers=(Reg.BATTERY_CHARGE,)) -_D("V450 Nano Cordless Laser Mouse", codename="V450 Nano", protocol=1.0, wpid="1011", registers=(Reg.BATTERY_CHARGE,)) -_D("V550 Nano Cordless Laser Mouse", codename="V550 Nano", protocol=1.0, wpid="1013", registers=(Reg.BATTERY_CHARGE,)) +_D( + "MX620 Laser Cordless Mouse", + codename="MX620", + protocol=1.0, + wpid=("100A", "1016"), + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "VX Nano Cordless Laser Mouse", + codename="VX Nano", + protocol=1.0, + wpid=("100B", "100F"), + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "V450 Nano Cordless Laser Mouse", + codename="V450 Nano", + protocol=1.0, + wpid="1011", + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "V550 Nano Cordless Laser Mouse", + codename="V550 Nano", + protocol=1.0, + wpid="1013", + registers=(Reg.BATTERY_CHARGE,), +) _D( "MX 1100 Cordless Laser Mouse", codename="MX 1100", @@ -282,11 +319,40 @@ _D( wpid="101A", registers=(Reg.BATTERY_STATUS, Reg.THREE_LEDS), ) -_D("Marathon Mouse M705 (M-R0009)", codename="M705 (M-R0009)", protocol=1.0, wpid="101B", registers=(Reg.BATTERY_CHARGE,)) -_D("Wireless Mouse M350", codename="M350", protocol=1.0, wpid="101C", registers=(Reg.BATTERY_CHARGE,)) -_D("Wireless Mouse M505", codename="M505/B605", protocol=1.0, wpid="101D", registers=(Reg.BATTERY_CHARGE,)) -_D("Wireless Mouse M305", codename="M305", protocol=1.0, wpid="101F", registers=(Reg.BATTERY_STATUS,)) -_D("Wireless Mouse M215", codename="M215", protocol=1.0, wpid="1020") +_D( + "Marathon Mouse M705 (M-R0009)", + codename="M705 (M-R0009)", + protocol=1.0, + wpid="101B", + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "Wireless Mouse M350", + codename="M350", + protocol=1.0, + wpid="101C", + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "Wireless Mouse M505", + codename="M505/B605", + protocol=1.0, + wpid="101D", + registers=(Reg.BATTERY_CHARGE,), +) +_D( + "Wireless Mouse M305", + codename="M305", + protocol=1.0, + wpid="101F", + registers=(Reg.BATTERY_STATUS,), +) +_D( + "Wireless Mouse M215", + codename="M215", + protocol=1.0, + wpid="1020", +) _D( "G700 Gaming Mouse", codename="G700", @@ -382,5 +448,19 @@ _D("G533 Gaming Headset", codename="G533 Headset", protocol=2.0, interface=3, ki _D("G535 Gaming Headset", codename="G535 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AC4) _D("G935 Gaming Headset", codename="G935 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0A87) _D("G733 Gaming Headset", codename="G733 Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AB5) -_D("G733 Gaming Headset", codename="G733 Headset New", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0AFE) -_D("PRO X Wireless Gaming Headset", codename="PRO Headset", protocol=2.0, interface=3, kind=DEVICE_KIND.headset, usbid=0x0ABA) +_D( + "G733 Gaming Headset", + codename="G733 Headset New", + protocol=2.0, + interface=3, + kind=DEVICE_KIND.headset, + usbid=0x0AFE, +) +_D( + "PRO X Wireless Gaming Headset", + codename="PRO Headset", + protocol=2.0, + interface=3, + kind=DEVICE_KIND.headset, + usbid=0x0ABA, +) diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index 0e8f1396..fdd9622f 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -176,7 +176,11 @@ class Device: descriptors.get_btid(self.product_id) if self.bluetooth else descriptors.get_usbid(self.product_id) ) if self.number is None: # for direct-connected devices get 'number' from descriptor protocol else use 0xFF - self.number = 0x00 if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0 else 0xFF + if self.descriptor and self.descriptor.protocol and self.descriptor.protocol < 2.0: + number = 0x00 + else: + number = 0xFF + self.number = number self.ping() # determine whether a direct-connected device is online if self.descriptor: diff --git a/lib/logitech_receiver/diversion.py b/lib/logitech_receiver/diversion.py index 2adebdc6..411ae7ed 100644 --- a/lib/logitech_receiver/diversion.py +++ b/lib/logitech_receiver/diversion.py @@ -26,6 +26,7 @@ import subprocess import sys import time +from typing import Any from typing import Dict from typing import Tuple @@ -113,9 +114,17 @@ except Exception: # Globals xtest_available = True # Xtest might be available xdisplay = None + + Xkbdisplay = None # xkb might be available +X11Lib = None + modifier_keycodes = [] XkbUseCoreKbd = 0x100 +NET_ACTIVE_WINDOW = None +NET_WM_PID = None +WM_CLASS = None + udevice = None @@ -187,7 +196,10 @@ def gnome_dbus_interface_setup(): remote_object = bus.get_object("org.gnome.Shell", "/io/github/pwr_solaar/solaar") _dbus_interface = dbus.Interface(remote_object, "io.github.pwr_solaar.solaar") except dbus.exceptions.DBusException: - logger.warning("Solaar Gnome extension not installed - some rule capabilities inoperable", exc_info=sys.exc_info()) + logger.warning( + "Solaar Gnome extension not installed - some rule capabilities inoperable", + exc_info=sys.exc_info(), + ) _dbus_interface = False return _dbus_interface @@ -228,7 +240,10 @@ if evdev: for _, evcode in buttons.values(): if evcode: key_events.append(evcode) - devicecap = {evdev.ecodes.EV_KEY: key_events, evdev.ecodes.EV_REL: [evdev.ecodes.REL_WHEEL, evdev.ecodes.REL_HWHEEL]} + devicecap = { + evdev.ecodes.EV_KEY: key_events, + evdev.ecodes.EV_REL: [evdev.ecodes.REL_WHEEL, evdev.ecodes.REL_HWHEEL], + } else: # Just mock these since they won't be useful without evdev anyway buttons = {} @@ -283,9 +298,9 @@ def xy_direction(_x, _y): y = round(_y / m) if x < 0 and y < 0: return "Mouse Up-left" - elif x > 0 and y < 0: + elif x > 0 > y: return "Mouse Up-right" - elif x < 0 and y > 0: + elif x < 0 < y: return "Mouse Down-left" elif x > 0 and y > 0: return "Mouse Down-right" @@ -456,7 +471,7 @@ TESTS = { "crown_tap": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[5] == 0x01 and d[5], False], "crown_start_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x01 and d[6], False], "crown_end_press": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] == 0x05 and d[6], False], - "crown_pressed": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and d[6] >= 0x01 and d[6] <= 0x04 and d[6], False], + "crown_pressed": [lambda f, r, d, a: f == FEATURE.CROWN and r == 0 and 0x01 <= d[6] <= 0x04 and d[6], False], "thumb_wheel_up": [thumb_wheel_up, True], "thumb_wheel_down": [thumb_wheel_down, True], "lowres_wheel_up": [ @@ -488,7 +503,7 @@ MOUSE_GESTURE_TESTS = { "mouse-noop": [], } -COMPONENTS = {} +# COMPONENTS = {} class RuleComponent: @@ -503,6 +518,17 @@ class RuleComponent: return Condition() +def _evaluate(components, feature, notification, device, result) -> Any: + res = True + for component in components: + res = component.evaluate(feature, notification, device, result) + if not isinstance(component, Action) and res is None: + return None + if isinstance(component, Condition) and not res: + return res + return res + + class Rule(RuleComponent): def __init__(self, args, source=None, warn=True): self.components = [self.compile(a) for a in args] @@ -515,14 +541,7 @@ class Rule(RuleComponent): def evaluate(self, feature, notification, device, last_result): if logger.isEnabledFor(logging.DEBUG): logger.debug("evaluate rule: %s", self) - result = True - for component in self.components: - result = component.evaluate(feature, notification, device, result) - if not isinstance(component, Action) and result is None: - return None - if isinstance(component, Condition) and not result: - return result - return result + return _evaluate(self.components, feature, notification, device, True) def once(self, feature, notification, device, last_result): self.evaluate(feature, notification, device, last_result) @@ -598,14 +617,7 @@ class And(Condition): def evaluate(self, feature, notification, device, last_result): if logger.isEnabledFor(logging.DEBUG): logger.debug("evaluate condition: %s", self) - result = True - for component in self.components: - result = component.evaluate(feature, notification, device, last_result) - if not isinstance(component, Action) and result is None: - return None - if isinstance(component, Condition) and not result: - return result - return result + return _evaluate(self.components, feature, notification, device, last_result) def data(self): return {"And": [c.data() for c in self.components]} @@ -663,7 +675,8 @@ class Process(Condition): if (not wayland and not x11_setup()) or (wayland and not gnome_dbus_interface_setup()): if warn: logger.warning( - "rules can only access active process in X11 or in Wayland under GNOME with Solaar Gnome extension - %s", + "rules can only access active process in X11 or in Wayland under GNOME with Solaar Gnome " + "extension - %s", self, ) if not isinstance(process, str): @@ -1218,7 +1231,13 @@ class KeyPress(Action): if gkeymap: current = gkeymap.get_modifier_state() if logger.isEnabledFor(logging.INFO): - logger.info("KeyPress action: %s %s, group %s, modifiers %s", self.key_names, self.action, kbdgroup(), current) + logger.info( + "KeyPress action: %s %s, group %s, modifiers %s", + self.key_names, + self.action, + kbdgroup(), + current, + ) if self.action != RELEASE: self.keyDown(self.key_symbols, current) if self.action != DEPRESS: @@ -1287,7 +1306,10 @@ class MouseClick(Action): if count in [CLICK, DEPRESS, RELEASE]: self.count = count elif warn: - logger.warning("rule MouseClick action: argument %s should be an integer or CLICK, PRESS, or RELEASE", count) + logger.warning( + "rule MouseClick action: argument %s should be an integer or CLICK, PRESS, or RELEASE", + count, + ) self.count = 1 def __str__(self): @@ -1332,7 +1354,12 @@ class Set(Action): return None args = setting.acceptable(self.args[2:], setting.read()) if args is None: - logger.warning("Set Action: invalid args %s for setting %s of %s", self.args[2:], self.args[1], self.args[0]) + logger.warning( + "Set Action: invalid args %s for setting %s of %s", + self.args[2:], + self.args[1], + self.args[0], + ) return None if len(args) > 1: setting.write_key_value(args[0], args[1]) @@ -1432,18 +1459,17 @@ COMPONENTS = { "Later": Later, } -built_in_rules = Rule([]) -if True: - built_in_rules = Rule( - [ - { - "Rule": [ # Implement problematic keys for Craft and MX Master - {"Rule": [{"Key": ["Brightness Down", "pressed"]}, {"KeyPress": "XF86_MonBrightnessDown"}]}, - {"Rule": [{"Key": ["Brightness Up", "pressed"]}, {"KeyPress": "XF86_MonBrightnessUp"}]}, - ] - }, - ] - ) + +built_in_rules = Rule( + [ + { + "Rule": [ # Implement problematic keys for Craft and MX Master + {"Rule": [{"Key": ["Brightness Down", "pressed"]}, {"KeyPress": "XF86_MonBrightnessDown"}]}, + {"Rule": [{"Key": ["Brightness Up", "pressed"]}, {"KeyPress": "XF86_MonBrightnessUp"}]}, + ] + }, + ] +) def key_is_down(key): diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 859e7a51..2d1a3d9f 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -21,6 +21,7 @@ import struct import threading from typing import Any +from typing import Dict from typing import List from typing import Optional from typing import Tuple @@ -55,10 +56,7 @@ KIND_MAP = {kind: hidpp10_constants.DEVICE_KIND[str(kind)] for kind in DEVICE_KI class Device(Protocol): - def feature_request(self, feature: FEATURE) -> Any: - ... - - def request(self) -> Any: + def feature_request(self, feature, function=0x00, *params, no_reply=False) -> Any: ... @property @@ -253,7 +251,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): def remappable_to(self) -> common.NamedInts: self._device.keys._ensure_all_keys_queried() ret = common.UnsortedNamedInts() - if self.group_mask != []: # only keys with a non-zero gmask are remappable + if self.group_mask: # only keys with a non-zero gmask are remappable ret[self.default_task] = self.default_task # it should always be possible to map the key to itself for g in self.group_mask: g = special_keys.CID_GROUP[str(g)] @@ -291,7 +289,11 @@ class ReprogrammableKeyV4(ReprogrammableKey): def _getCidReporting(self): try: - mapped_data = self._device.feature_request(FEATURE.REPROG_CONTROLS_V4, 0x20, *tuple(struct.pack("!H", self._cid))) + mapped_data = self._device.feature_request( + FEATURE.REPROG_CONTROLS_V4, + 0x20, + *tuple(struct.pack("!H", self._cid)), + ) if mapped_data: cid, mapping_flags_1, mapped_to = struct.unpack("!HBH", mapped_data[:5]) if cid != self._cid and logger.isEnabledFor(logging.WARNING): @@ -316,11 +318,17 @@ class ReprogrammableKeyV4(ReprogrammableKey): self._mapping_flags = 0 self._mapped_to = self._cid - def _setCidReporting(self, flags=None, remap=0): - """Sends a `setCidReporting` request with the given parameters. Raises an exception if the parameters are invalid. - Parameters: - - flags {Dict[NamedInt,bool]} -- a dictionary of which mapping flags to set/unset - - remap {int} -- which control ID to remap to; or 0 to keep current mapping + def _setCidReporting(self, flags: Dict[NamedInt, bool] = None, remap: int = 0): + """Sends a `setCidReporting` request with the given parameters. + + Raises an exception if the parameters are invalid. + + Parameters + ---------- + flags + A dictionary of which mapping flags to set/unset. + remap + Which control ID to remap to; or 0 to keep current mapping. """ flags = flags if flags else {} # See flake8 B006 @@ -555,7 +563,13 @@ class KeysArrayPersistent(KeysArray): keydata = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x20, index, 0xFF) if keydata: key = struct.unpack("!H", keydata[:2])[0] - mapped_data = self.device.feature_request(FEATURE.PERSISTENT_REMAPPABLE_ACTION, 0x30, key >> 8, key & 0xFF, 0xFF) + mapped_data = self.device.feature_request( + FEATURE.PERSISTENT_REMAPPABLE_ACTION, + 0x30, + key >> 8, + key & 0xFF, + 0xFF, + ) if mapped_data: _ignore, _ignore, actionId, remapped, modifiers, status = struct.unpack("!HBBHBB", mapped_data[:8]) else: @@ -571,7 +585,15 @@ class KeysArrayPersistent(KeysArray): remapped = special_keys.HID_CONSUMERCODES[remapped] elif actionId == special_keys.ACTIONID.Empty: # purge data from empty value remapped = modifiers = 0 - self.keys[index] = PersistentRemappableAction(self.device, index, key, actionId, remapped, modifiers, status) + self.keys[index] = PersistentRemappableAction( + self.device, + index, + key, + actionId, + remapped, + modifiers, + status, + ) elif logger.isEnabledFor(logging.WARNING): logger.warning(f"Key with index {index} was expected to exist but device doesn't report it.") @@ -673,15 +695,15 @@ class Gesture: if index is not None: offset = index >> 3 # 8 gestures per byte mask = 0x1 << (index % 8) - return (offset, mask) + return offset, mask else: - return (None, None) + return None, None - def enable_offset_mask(gesture): - return gesture._offset_mask(gesture.index) + def enable_offset_mask(self): + return self._offset_mask(self.index) - def diversion_offset_mask(gesture): - return gesture._offset_mask(gesture.diversion_index) + def diversion_offset_mask(self): + return self._offset_mask(self.diversion_index) def enabled(self): # is the gesture enabled? if self._enabled is None and self.index is not None: @@ -710,7 +732,14 @@ class Gesture: return None if self.diversion_index is not None: offset, mask = self.diversion_offset_mask() - reply = self._device.feature_request(FEATURE.GESTURE_2, 0x40, offset, 0x01, mask, mask if diverted else 0x00) + reply = self._device.feature_request( + FEATURE.GESTURE_2, + 0x40, + offset, + 0x01, + mask, + mask if diverted else 0x00, + ) return reply def as_int(self): @@ -919,7 +948,8 @@ LEDParamSize = { LEDParam.saturation: 1, } # not implemented from x8070 Wave=4, Stars=5, Press=6, Audio=7 -# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20, WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25 +# not implemented from x8071 Custom=12, Kitt=13, HSVPulsing=20, +# WaveC=22, RippleC=23, SignatureActive=24, SignaturePassive=25 LEDEffects = { 0x00: [NamedInt(0x00, _("Disabled")), {}], 0x01: [NamedInt(0x01, _("Static")), {LEDParam.color: 0, LEDParam.ramp: 3}], @@ -927,7 +957,10 @@ LEDEffects = { 0x03: [NamedInt(0x03, _("Cycle")), {LEDParam.period: 5, LEDParam.intensity: 7}], 0x08: [NamedInt(0x08, _("Boot")), {}], 0x09: [NamedInt(0x09, _("Demo")), {}], - 0x0A: [NamedInt(0x0A, _("Breathe")), {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}], + 0x0A: [ + NamedInt(0x0A, _("Breathe")), + {LEDParam.color: 0, LEDParam.period: 3, LEDParam.form: 5, LEDParam.intensity: 6}, + ], 0x0B: [NamedInt(0x0B, _("Ripple")), {LEDParam.color: 0, LEDParam.period: 4}], 0x0E: [NamedInt(0x0E, _("Decomposition")), {LEDParam.period: 6, LEDParam.intensity: 8}], 0x0F: [NamedInt(0x0F, _("Signature1")), {LEDParam.period: 5, LEDParam.intensity: 7}], @@ -976,7 +1009,7 @@ class LEDEffectSetting: # an effect plus its parameters return dumper.represent_mapping("!LEDEffectSetting", data.__dict__, flow_style=True) def __eq__(self, other): - return type(self) == type(other) and self.to_bytes() == other.to_bytes() + return isinstance(other, self.__class__) and self.to_bytes() == other.to_bytes() def __str__(self): return yaml.dump(self, width=float("inf")).rstrip("\n") @@ -1150,7 +1183,8 @@ class Button: elif self.type == ButtonMappingTypes.No_Action: bytes += b"\xff\xff" elif self.behavior == ButtonBehaviors.Function: - bytes += common.int2bytes(self.value, 1) + b"\xff" + (common.int2bytes(self.data, 1) if self.data else b"\x00") + data = common.int2bytes(self.data, 1) if self.data else b"\x00" + bytes += common.int2bytes(self.value, 1) + b"\xff" + data else: bytes = self.bytes if self.bytes else b"\xff\xff\xff\xff" return bytes @@ -1318,7 +1352,9 @@ class OnboardProfiles: def to_bytes(self): bytes = b"" for i in range(1, len(self.profiles) + 1): - bytes += common.int2bytes(self.profiles[i].sector, 2) + common.int2bytes(self.profiles[i].enabled, 1) + b"\x00" + profiles_sector = common.int2bytes(self.profiles[i].sector, 2) + profiles_enabled = common.int2bytes(self.profiles[i].enabled, 1) + bytes += profiles_sector + profiles_enabled + b"\x00" bytes += b"\xff\xff\x00\x00" # marker after last profile while len(bytes) < self.size - 2: # leave room for CRC bytes += b"\xff" @@ -1333,7 +1369,14 @@ class OnboardProfiles: chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, o >> 8, o & 0xFF) bytes += chunk o += 16 - chunk = dev.feature_request(FEATURE.ONBOARD_PROFILES, 0x50, sector >> 8, sector & 0xFF, (s - 16) >> 8, (s - 16) & 0xFF) + chunk = dev.feature_request( + FEATURE.ONBOARD_PROFILES, + 0x50, + sector >> 8, + sector & 0xFF, + (s - 16) >> 8, + (s - 16) & 0xFF, + ) bytes += chunk[16 + o - s :] # the last chunk has to be read in an awkward way return bytes @@ -1443,7 +1486,7 @@ class Hidpp20: if transport_bits & flag: tid_map[transport] = modelId[offset : offset + 2].hex().upper() offset = offset + 2 - return (unitId.hex().upper(), modelId.hex().upper(), tid_map) + return unitId.hex().upper(), modelId.hex().upper(), tid_map def get_kind(self, device: Device): """Reads a device's type. @@ -1831,6 +1874,7 @@ def decipher_battery_unified(report): def decipher_adc_measurement(report): # partial implementation - needs mapping to levels + charge_level = None adc, flags = struct.unpack("!HB", report[:3]) for level in battery_voltage_remaining: if level[0] < adc: diff --git a/lib/logitech_receiver/hidpp20_constants.py b/lib/logitech_receiver/hidpp20_constants.py index 864e6e2a..6499956d 100644 --- a/lib/logitech_receiver/hidpp20_constants.py +++ b/lib/logitech_receiver/hidpp20_constants.py @@ -150,7 +150,14 @@ FEATURE._fallback = lambda x: f"unknown:{x:04X}" FEATURE_FLAG = NamedInts(internal=0x20, hidden=0x40, obsolete=0x80) DEVICE_KIND = NamedInts( - keyboard=0x00, remote_control=0x01, numpad=0x02, mouse=0x03, touchpad=0x04, trackball=0x05, presenter=0x06, receiver=0x07 + keyboard=0x00, + remote_control=0x01, + numpad=0x02, + mouse=0x03, + touchpad=0x04, + trackball=0x05, + presenter=0x06, + receiver=0x07, ) FIRMWARE_KIND = NamedInts(Firmware=0x00, Bootloader=0x01, Hardware=0x02, Other=0x03) diff --git a/lib/logitech_receiver/settings.py b/lib/logitech_receiver/settings.py index d717935d..9f0eea25 100644 --- a/lib/logitech_receiver/settings.py +++ b/lib/logitech_receiver/settings.py @@ -111,7 +111,7 @@ class Setting: assert hasattr(self, "_device") if self._validator.kind == KIND.range: - return (self._validator.min_value, self._validator.max_value) + return self._validator.min_value, self._validator.max_value def _pre_read(self, cached, key=None): if self.persist and self._value is None and getattr(self._device, "persister", None): @@ -1208,7 +1208,7 @@ class RangeValidator(Validator): if len(args) == 1: return args[0] == current elif len(args) == 2: - return args[0] <= current and current <= args[1] + return args[0] <= current <= args[1] else: return False diff --git a/lib/solaar/__init__.py b/lib/solaar/__init__.py index 97bbc6ac..292a70a1 100644 --- a/lib/solaar/__init__.py +++ b/lib/solaar/__init__.py @@ -22,7 +22,17 @@ NAME = "Solaar" try: __version__ = ( - subprocess.check_output(["git", "describe", "--always"], cwd=sys.path[0], stderr=subprocess.DEVNULL).strip().decode() + subprocess.check_output( + [ + "git", + "describe", + "--always", + ], + cwd=sys.path[0], + stderr=subprocess.DEVNULL, + ) + .strip() + .decode() ) except Exception: try: diff --git a/lib/solaar/cli/__init__.py b/lib/solaar/cli/__init__.py index 65d6505d..2f846955 100644 --- a/lib/solaar/cli/__init__.py +++ b/lib/solaar/cli/__init__.py @@ -33,7 +33,9 @@ logger = logging.getLogger(__name__) def _create_parser(): parser = argparse.ArgumentParser( - prog=NAME.lower(), add_help=False, epilog=f"For details on individual actions, run `{NAME.lower()} --help`." + prog=NAME.lower(), + add_help=False, + epilog=f"For details on individual actions, run `{NAME.lower()} --help`.", ) subparsers = parser.add_subparsers(title="actions", help="command-line action to perform") @@ -53,7 +55,11 @@ def _create_parser(): ) sp.set_defaults(action="probe") - sp = subparsers.add_parser("profiles", help="read or write onboard profiles", epilog="Only works on active devices.") + sp = subparsers.add_parser( + "profiles", + help="read or write onboard profiles", + epilog="Only works on active devices.", + ) sp.add_argument( "device", help="device to read or write profiles of; may be a device number (1..6), a serial number, " @@ -69,7 +75,10 @@ def _create_parser(): ) sp.add_argument( "device", - help="device to configure; may be a device number (1..6), a serial number, " "or a substring of a device's name", + help=( + "device to configure; may be a device number (1..6), a serial number, ", + "or a substring of a device's name", + ), ) sp.add_argument("setting", nargs="?", help="device-specific setting; leave empty to list available settings") sp.add_argument("value_key", nargs="?", help="new value for the setting or key for keyed settings") @@ -206,7 +215,7 @@ def run(cli_args=None, hidraw_path=None): c = list(_receivers(hidraw_path)) if not c: raise Exception( - 'No supported device found. Use "lsusb" and "bluetoothctl devices Connected" to list connected devices.' + 'No supported device found. Use "lsusb" and "bluetoothctl devices Connected" to list connected devices.' ) m = import_module("." + action, package=__name__) m.run(c, args, _find_receiver, _find_device) diff --git a/lib/solaar/cli/config.py b/lib/solaar/cli/config.py index 66a4a0ab..1e26a43e 100644 --- a/lib/solaar/cli/config.py +++ b/lib/solaar/cli/config.py @@ -22,6 +22,8 @@ from logitech_receiver.common import NamedInts from solaar import configuration +APP_ID = "io.github.pwr_solaar.solaar" + def _print_setting(s, verbose=True): print("#", s.label) @@ -92,7 +94,7 @@ def select_choice(value, choices, setting, key): break if val is not None: value = val - elif ivalue is not None and ivalue >= 1 and ivalue <= len(choices): + elif ivalue is not None and 1 <= ivalue <= len(choices): value = choices[ivalue - 1] elif lvalue in ("higher", "lower"): old_value = setting.read() if key is None else setting.read_key(key) @@ -134,13 +136,13 @@ def select_range(value, setting): value = int(value) except ValueError as exc: raise Exception(f"{setting.name}: can't interpret '{value}' as integer") from exc - min, max = setting.range - if value < min or value > max: + minimum, maximum = setting.range + if value < minimum or value > maximum: raise Exception(f"{setting.name}: value '{value}' out of bounds") return value -def run(receivers, args, find_receiver, find_device): +def run(receivers, args, _find_receiver, find_device): assert receivers assert args.device @@ -190,7 +192,6 @@ def run(receivers, args, find_receiver, find_device): from gi.repository import Gtk if Gtk.init_check()[0]: # can Gtk be initialized? - APP_ID = "io.github.pwr_solaar.solaar" application = Gtk.Application.new(APP_ID, Gio.ApplicationFlags.HANDLES_COMMAND_LINE) application.register() remote = application.get_is_remote() @@ -236,7 +237,7 @@ def set(dev, setting, args, save): elif setting.kind == settings.KIND.map_choice: if args.extra_subkey is None: _print_setting_keyed(setting, args.value_key) - return (None, None, None) + return None, None, None key = args.value_key ikey = to_int(key) k = next((k for k in setting.choices.keys() if key == k), None) @@ -254,7 +255,7 @@ def set(dev, setting, args, save): elif setting.kind == settings.KIND.multiple_toggle: if args.extra_subkey is None: _print_setting_keyed(setting, args.value_key) - return (None, None, None) + return None, None, None key = args.value_key all_keys = getattr(setting, "choices_universe", None) ikey = all_keys[int(key) if key.isdigit() else key] if isinstance(all_keys, NamedInts) else to_int(key) diff --git a/lib/solaar/cli/pair.py b/lib/solaar/cli/pair.py index 16aa9858..02af3906 100644 --- a/lib/solaar/cli/pair.py +++ b/lib/solaar/cli/pair.py @@ -51,7 +51,7 @@ def run(receivers, args, find_receiver, _ignore): assert n if n.devnumber == 0xFF: notifications.process(receiver, n) - elif n.sub_id == 0x41 and len(n.data) == base._SHORT_MESSAGE_SIZE - 4: + elif n.sub_id == 0x41 and len(n.data) == base.SHORT_MESSAGE_SIZE - 4: kd, known_devices = known_devices, None # only process one connection notification if kd is not None: if n.devnumber not in kd: diff --git a/lib/solaar/gtk.py b/lib/solaar/gtk.py index 4641bcc6..fa1e4c41 100755 --- a/lib/solaar/gtk.py +++ b/lib/solaar/gtk.py @@ -50,6 +50,7 @@ def _require(module, os_package, gi=None, gi_package=None, gi_version=None): battery_icons_style = "regular" +tray_icon_size = None temp = tempfile.NamedTemporaryFile(prefix="Solaar_", mode="w", delete=True) @@ -72,9 +73,16 @@ def _parse_arguments(): metavar="PATH", help="unifying receiver to use; the first detected receiver if unspecified. Example: /dev/hidraw2", ) - arg_parser.add_argument("--restart-on-wake-up", action="store_true", help="restart Solaar on sleep wake-up (experimental)") arg_parser.add_argument( - "-w", "--window", choices=("show", "hide", "only"), help="start with window showing / hidden / only (no tray icon)" + "--restart-on-wake-up", + action="store_true", + help="restart Solaar on sleep wake-up (experimental)", + ) + arg_parser.add_argument( + "-w", + "--window", + choices=("show", "hide", "only"), + help="start with window showing / hidden / only (no tray icon)", ) arg_parser.add_argument( "-b", diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 753fbf16..76a4b1af 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -49,7 +49,13 @@ _GHOST_DEVICE.__nonzero__ = _GHOST_DEVICE.__bool__ def _ghost(device): - return _GHOST_DEVICE(receiver=device.receiver, number=device.number, name=device.name, kind=device.kind, online=False) + return _GHOST_DEVICE( + receiver=device.receiver, + number=device.number, + name=device.name, + kind=device.kind, + online=False, + ) class SolaarListener(listener.EventsListener): @@ -230,7 +236,9 @@ class SolaarListener(listener.EventsListener): def _process_bluez_dbus(device, path, dictionary, signature): - """Process bluez dbus property changed signals for device status changes to discover disconnections and connections""" + """Process bluez dbus property changed signals for device status + changes to discover disconnections and connections. + """ if device: if dictionary.get("Connected") is not None: connected = dictionary.get("Connected") diff --git a/lib/solaar/ui/__init__.py b/lib/solaar/ui/__init__.py index 1510db00..290c5b8f 100644 --- a/lib/solaar/ui/__init__.py +++ b/lib/solaar/ui/__init__.py @@ -84,7 +84,7 @@ def _command_line(app, command_line): return 0 -def _shutdown(app, shutdown_hook): +def _shutdown(_app, shutdown_hook): if logger.isEnabledFor(logging.DEBUG): logger.debug("shutdown") shutdown_hook() diff --git a/lib/solaar/ui/about/presenter.py b/lib/solaar/ui/about/presenter.py index b63791de..bc3454b4 100644 --- a/lib/solaar/ui/about/presenter.py +++ b/lib/solaar/ui/about/presenter.py @@ -31,7 +31,7 @@ class AboutViewProtocol(Protocol): def update_description(self, comments: str) -> None: ... - def update_copyright(self, copyright): + def update_copyright(self, copyright_text: str) -> None: ... def update_authors(self, authors: list[str]) -> None: @@ -68,8 +68,8 @@ class Presenter: self.view.update_description(comments) def update_copyright(self) -> None: - copyright = self.model.get_copyright() - self.view.update_copyright(copyright) + copyright_text = self.model.get_copyright() + self.view.update_copyright(copyright_text) def update_authors(self) -> None: authors = self.model.get_authors() diff --git a/lib/solaar/ui/common.py b/lib/solaar/ui/common.py index 191646da..93cab516 100644 --- a/lib/solaar/ui/common.py +++ b/lib/solaar/ui/common.py @@ -48,7 +48,10 @@ def _create_error_text(reason: str, object_) -> Tuple[str, str]: elif reason == "unpair": title = _("Unpairing failed") text = ( - _("Failed to unpair %{device} from %{receiver}.").format(device=object_.name, receiver=object_.receiver.name) + _("Failed to unpair %{device} from %{receiver}.").format( + device=object_.name, + receiver=object_.receiver.name, + ) + "\n\n" + _("The receiver returned an error, with no further details.") ) diff --git a/lib/solaar/ui/config_panel.py b/lib/solaar/ui/config_panel.py index 2520ae89..c07e257f 100644 --- a/lib/solaar/ui/config_panel.py +++ b/lib/solaar/ui/config_panel.py @@ -52,7 +52,7 @@ def _read_async(setting, force_read, sbox, device_is_online, sensitive): def _write_async(setting, value, sbox, sensitive=True, key=None): - def _do_write(s, v, sb, key): + def _do_write(_s, v, sb, key): try: if key is None: v = setting.write(v) @@ -87,8 +87,9 @@ class Scale(Gtk.Scale): class Control: - def __init__(**kwargs): - pass + def __init__(self, **kwargs): + self.sbox = None + self.delegate = None def init(self, sbox, delegate): self.sbox = sbox @@ -227,12 +228,12 @@ class ChoiceControlBig(Gtk.Entry, Control): tooltip = _("Incomplete") if self.value is None else _("Complete - ENTER to change") self.set_icon_tooltip_text(Gtk.EntryIconPosition.SECONDARY, tooltip) - def activate(self, *args): + def activate(self, *_args): if self.value is not None and self.get_sensitive(): self.set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, "") self.delegate.update() - def select(self, completion, model, iter): + def select(self, _completion, model, iter): self.set_value(model.get(iter, 0)[0]) if self.value and self.get_sensitive(): self.set_icon_from_icon_name(Gtk.EntryIconPosition.SECONDARY, "") @@ -278,7 +279,7 @@ class MapChoiceControl(Gtk.HBox, Control): if current is not None: self.valueBox.set_value(current) - def map_value_notify_key(self, *args): + def map_value_notify_key(self, *_args): key_choice = int(self.keyBox.get_active_id()) if self.keyBox.get_sensitive(): self.map_populate_value_box(key_choice) @@ -321,7 +322,7 @@ class MultipleControl(Gtk.ListBox, Control): sbox._button = self._button return True - def toggle_display(self, *args): + def toggle_display(self, *_args): self._showing = not self._showing if not self._showing: for c in self.get_children(): @@ -355,7 +356,7 @@ class MultipleToggleControl(MultipleControl): self.add(h) self._label_control_pairs.append((lbl, control)) - def toggle_notify(self, switch, active): + def toggle_notify(self, switch, _active): if switch.get_sensitive(): key = switch._setting_key new_state = switch.get_state() @@ -410,7 +411,12 @@ class MultipleRangeControl(MultipleControl): h.pack_start(sub_item_lbl, False, False, 0) sub_item_lbl.set_margin_start(30) if sub_item.widget == "Scale": - control = Gtk.Scale.new_with_range(Gtk.Orientation.HORIZONTAL, sub_item.minimum, sub_item.maximum, 1) + control = Gtk.Scale.new_with_range( + Gtk.Orientation.HORIZONTAL, + sub_item.minimum, + sub_item.maximum, + 1, + ) control.set_round_digits(0) control.set_digits(0) h.pack_end(control, True, True, 0) @@ -474,7 +480,6 @@ class MultipleRangeControl(MultipleControl): class PackedRangeControl(MultipleRangeControl): def setup(self, setting): validator = setting._validator - self._items = [] for item in range(validator.count): h = Gtk.HBox(homogeneous=False, spacing=0) lbl = Gtk.Label(label=str(validator.keys[item])) @@ -536,8 +541,9 @@ class HeteroKeyControl(Gtk.HBox, Control): item_lblbox.set_visible(False) else: item_lblbox = None + + item_box = ComboBoxText() if item["kind"] == settings.KIND.choice: - item_box = ComboBoxText() for entry in item["choices"]: item_box.append(str(int(entry)), str(entry)) item_box.set_active(0) @@ -572,8 +578,8 @@ class HeteroKeyControl(Gtk.HBox, Control): self.sbox._failed.set_visible(True) self.setup_visibles(value.ID if value is not None else 0) - def setup_visibles(self, ID): - fields = self.sbox.setting.fields_map[ID][1] if ID in self.sbox.setting.fields_map else {} + def setup_visibles(self, id_): + fields = self.sbox.setting.fields_map[id_][1] if id_ in self.sbox.setting.fields_map else {} for name, (lblbox, box) in self._items.items(): visible = name in fields or name == "ID" if lblbox: @@ -635,7 +641,7 @@ def _change_icon(allowed, icon): icon.set_tooltip_text(_allowables_tooltips[allowed]) -def _create_sbox(s, device): +def _create_sbox(s, _device): sbox = Gtk.HBox(homogeneous=False, spacing=6) sbox.setting = s sbox.kind = s.kind @@ -689,10 +695,10 @@ def _create_sbox(s, device): return sbox -def _update_setting_item(sbox, value, is_online=True, sensitive=True, nullOK=False): +def _update_setting_item(sbox, value, is_online=True, sensitive=True, null_okay=False): sbox._spinner.stop() sensitive = sbox._change_icon._allowed if sensitive is None else sensitive - if value is None and not nullOK: + if value is None and not null_okay: sbox._control.set_sensitive(sensitive is True) _change_icon(sensitive, sbox._change_icon) sbox._failed.set_visible(is_online) @@ -807,7 +813,11 @@ def _record_setting(device, setting_class, values): logger.debug("on %s changing setting %s to %s", device, setting_class.name, values) setting = next((s for s in device.settings if s.name == setting_class.name), None) if setting is None and logger.isEnabledFor(logging.DEBUG): - logger.debug("No setting for %s found on %s when trying to record a change made elsewhere", setting_class.name, device) + logger.debug( + "No setting for %s found on %s when trying to record a change made elsewhere", + setting_class.name, + device, + ) if setting: assert device == setting._device if len(values) > 1: diff --git a/lib/solaar/ui/diversion_rules.py b/lib/solaar/ui/diversion_rules.py index cf610685..17431a33 100644 --- a/lib/solaar/ui/diversion_rules.py +++ b/lib/solaar/ui/diversion_rules.py @@ -616,7 +616,8 @@ class DiversionDialog: if len(parent_c.components) == 0: # placeholder _populate_model(m, parent_it, None, level=wrapped.level) m.remove(it) - self.view.get_selection().select_iter(m.iter_nth_child(parent_it, max(0, min(idx, len(parent_c.components) - 1)))) + select = max(0, min(idx, len(parent_c.components) - 1)) + self.view.get_selection().select_iter(m.iter_nth_child(parent_it, select)) self.on_update() return c @@ -1408,7 +1409,6 @@ class _SettingWithValueUI: def create_widgets(self): self.widgets = {} - self.label = Gtk.Label(valign=Gtk.Align.CENTER, hexpand=True) self.label.set_text(self.label_text) self.widgets[self.label] = (0, 0, 5, 1) @@ -1432,7 +1432,13 @@ class _SettingWithValueUI: self.device_field.connect("changed", self._on_update) self.widgets[self.device_field] = (1, 1, 1, 1) - lbl = Gtk.Label(label=_("Setting"), halign=Gtk.Align.CENTER, valign=Gtk.Align.CENTER, hexpand=True, vexpand=False) + lbl = Gtk.Label( + label=_("Setting"), + halign=Gtk.Align.CENTER, + valign=Gtk.Align.CENTER, + hexpand=True, + vexpand=False, + ) self.widgets[lbl] = (0, 2, 1, 1) self.setting_field = SmartComboBox([(s[0].name, s[0].label) for s in self.ALL_SETTINGS.values()]) self.setting_field.set_valign(Gtk.Align.CENTER) @@ -1569,7 +1575,11 @@ class _SettingWithValueUI: def item(k): lbl = labels.get(k, None) - return (k, lbl[0] if lbl and isinstance(lbl, tuple) and lbl[0] else str(k)) + if lbl and isinstance(lbl, tuple) and lbl[0]: + label = lbl[0] + else: + label = str(k) + return k, label with self.ignore_changes(): self.key_field.set_all_values(sorted(map(item, keys), key=lambda k: k[1])) @@ -1688,6 +1698,7 @@ class _SettingWithValueUI: setting, val_class, kind, keys = cls._setting_attributes(setting_name, device) device_setting = (device.settings if device else {}).get(setting_name, None) disp = [setting.label or setting.name if setting else setting_name] + key = None if kind in cls.MULTIPLE: key = next(a, None) key = _from_named_ints(key, keys) if keys else key diff --git a/lib/solaar/ui/pair_window.py b/lib/solaar/ui/pair_window.py index 03bd6603..84b63539 100644 --- a/lib/solaar/ui/pair_window.py +++ b/lib/solaar/ui/pair_window.py @@ -166,13 +166,21 @@ def _show_passcode(assistant, receiver, passkey): page_text = _("Enter passcode on %(name)s.") % {"name": name} page_text += "\n" if authentication & 0x01: - page_text += _("Type %(passcode)s and then press the enter key.") % {"passcode": receiver.pairing.device_passkey} + page_text += _("Type %(passcode)s and then press the enter key.") % { + "passcode": receiver.pairing.device_passkey, + } else: passcode = ", ".join( [_("right") if bit == "1" else _("left") for bit in f"{int(receiver.pairing.device_passkey):010b}"] ) page_text += _("Press %(code)s\nand then press left and right buttons simultaneously.") % {"code": passcode} - page = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, intro_text, "preferences-desktop-peripherals", page_text) + page = _create_page( + assistant, + Gtk.AssistantPageType.PROGRESS, + intro_text, + "preferences-desktop-peripherals", + page_text, + ) assistant.set_page_complete(page, True) assistant.next_page() @@ -185,7 +193,13 @@ def _create_assistant(receiver, ok, finish, title, text): assistant.set_resizable(False) assistant.set_role("pair-device") if ok: - page_intro = _create_page(assistant, Gtk.AssistantPageType.PROGRESS, title, "preferences-desktop-peripherals", text) + page_intro = _create_page( + assistant, + Gtk.AssistantPageType.PROGRESS, + title, + "preferences-desktop-peripherals", + text, + ) spinner = Gtk.Spinner() spinner.set_visible(True) spinner.start() @@ -227,7 +241,7 @@ def _create_success_page(assistant, device): assistant.commit() -def _create_failure_page(assistant, error): +def _create_failure_page(assistant, error) -> None: header = _("Pairing failed") + ": " + _(str(error)) + "." if "timeout" in str(error): text = _("Make sure your device is within range, and has a decent battery charge.") @@ -242,7 +256,7 @@ def _create_failure_page(assistant, error): assistant.commit() -def _create_page(assistant, kind, header=None, icon_name=None, text=None): +def _create_page(assistant, kind, header=None, icon_name=None, text=None) -> Gtk.VBox: p = Gtk.VBox(homogeneous=False, spacing=8) assistant.append_page(p) assistant.set_page_type(p, kind) diff --git a/lib/solaar/ui/rule_base.py b/lib/solaar/ui/rule_base.py index 94b4a9b3..30790764 100644 --- a/lib/solaar/ui/rule_base.py +++ b/lib/solaar/ui/rule_base.py @@ -15,6 +15,7 @@ ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from contextlib import contextmanager as contextlib_contextmanager +from typing import Callable from gi.repository import Gtk from logitech_receiver import diversion @@ -49,7 +50,7 @@ class CompletionEntry(Gtk.Entry): class RuleComponentUI: CLASS = diversion.RuleComponent - def __init__(self, panel, on_update=None): + def __init__(self, panel, on_update: Callable = None): self.panel = panel self.widgets = {} # widget -> coord. in grid self.component = None diff --git a/lib/solaar/ui/tray.py b/lib/solaar/ui/tray.py index a076b6ce..f89855c7 100644 --- a/lib/solaar/ui/tray.py +++ b/lib/solaar/ui/tray.py @@ -72,7 +72,8 @@ def _scroll(tray_icon, event, direction=None): # ignore all other directions return - if sum(map(lambda i: i[1] is not None, _devices_info)) < 2: # don't bother even trying to scroll if less than two devices + # don't bother even trying to scroll if less than two devices + if sum(map(lambda i: i[1] is not None, _devices_info)) < 2: return # scroll events come way too fast (at least 5-6 at once) so take a little break between them @@ -215,7 +216,10 @@ except ImportError: icon.set_tooltip_text(NAME) icon.connect("activate", window.toggle) icon.connect("scroll-event", _scroll) - icon.connect("popup-menu", lambda icon, button, time: menu.popup(None, None, icon.position_menu, icon, button, time)) + icon.connect( + "popup-menu", + lambda icon, button, time: menu.popup(None, None, icon.position_menu, icon, button, time), + ) return icon diff --git a/lib/solaar/ui/window.py b/lib/solaar/ui/window.py index 89b034ee..06ad70bc 100644 --- a/lib/solaar/ui/window.py +++ b/lib/solaar/ui/window.py @@ -501,47 +501,47 @@ def _update_details(button): # If read_all is False, only return stuff that is ~100% already # cached, and involves no HID++ calls. - yield (_("Path"), device.path) + yield _("Path"), device.path if device.kind is None: - yield (_("USB ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id) + yield _("USB ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id if read_all: - yield (_("Serial"), device.serial) + yield _("Serial"), device.serial else: - yield (_("Serial"), "...") + yield _("Serial"), "..." else: # yield ('Codename', device.codename) - yield (_("Index"), device.number) + yield _("Index"), device.number if device.wpid: - yield (_("Wireless PID"), device.wpid) + yield _("Wireless PID"), device.wpid if device.product_id: - yield (_("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id) + yield _("Product ID"), f"{LOGITECH_VENDOR_ID:04x}:" + device.product_id hid_version = device.protocol - yield (_("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown")) + yield _("Protocol"), f"HID++ {hid_version:1.1f}" if hid_version else _("Unknown") if read_all and device.polling_rate: - yield (_("Polling rate"), device.polling_rate) + yield _("Polling rate"), device.polling_rate if read_all or not device.online: - yield (_("Serial"), device.serial) + yield _("Serial"), device.serial else: - yield (_("Serial"), "...") + yield _("Serial"), "..." if read_all and device.unitId and device.unitId != device.serial: - yield (_("Unit ID"), device.unitId) + yield _("Unit ID"), device.unitId if read_all: if device.firmware: for fw in list(device.firmware): - yield (" " + _(str(fw.kind)), (fw.name + " " + fw.version).strip()) + yield " " + _(str(fw.kind)), (fw.name + " " + fw.version).strip() elif device.kind is None or device.online: - yield (f" {_('Firmware')}", "...") + yield f" {_('Firmware')}", "..." flag_bits = device.notification_flags if flag_bits is not None: flag_names = ( (f"({_('none')})",) if flag_bits == 0 else hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits) ) - yield (_("Notifications"), (f"\n{' ':15}").join(flag_names)) + yield _("Notifications"), f"\n{' ':15}".join(flag_names) def _set_details(text): _details._text.set_markup(text) diff --git a/setup.py b/setup.py index 9a154bd6..9a8ed4ef 100755 --- a/setup.py +++ b/setup.py @@ -6,11 +6,7 @@ from os.path import dirname from pathlib import Path from setuptools import find_packages - -try: - from setuptools import setup -except ImportError: - from distutils.core import setup +from setuptools import setup NAME = "Solaar" version = Path("lib/solaar/version").read_text().strip() diff --git a/tests/logitech_receiver/fake_hidpp.py b/tests/logitech_receiver/fake_hidpp.py index 7a5d32de..08b715c3 100644 --- a/tests/logitech_receiver/fake_hidpp.py +++ b/tests/logitech_receiver/fake_hidpp.py @@ -16,6 +16,7 @@ """HID++ data and functions common to several logitech_receiver test files""" +from __future__ import annotations import errno import threading @@ -43,7 +44,17 @@ def ping(responses, handle, devnumber, long_message=False): return r.response -def request(responses, handle, devnumber, id, *params, no_reply=False, return_error=False, long_message=False, protocol=1.0): +def request( + responses, + handle, + devnumber, + id, + *params, + no_reply=False, + return_error=False, + long_message=False, + protocol=1.0, +): params = b"".join(pack("B", p) if isinstance(p, int) else p for p in params) print("REQUEST ", hex(handle), hex(devnumber), hex(id), params.hex()) for r in responses: @@ -54,7 +65,7 @@ def request(responses, handle, devnumber, id, *params, no_reply=False, return_er @dataclass class Response: - response: Optional[str] + response: str | float id: int params: str = "" handle: int = 0x11 diff --git a/tests/logitech_receiver/test_hidpp10.py b/tests/logitech_receiver/test_hidpp10.py index 536551bc..00ce9ab4 100644 --- a/tests/logitech_receiver/test_hidpp10.py +++ b/tests/logitech_receiver/test_hidpp10.py @@ -225,7 +225,7 @@ def test_set_3leds(device, level, charging, warning, p1, p2, mocker): spy_request.assert_called_once_with(0x8000 | Registers.THREE_LEDS, p1, p2) -@pytest.mark.parametrize("device", [(device_offline), (device_features)]) +@pytest.mark.parametrize("device", [device_offline, device_features]) def test_set_3leds_missing(device, mocker): spy_request = mocker.spy(device, "request") diff --git a/tests/logitech_receiver/test_hidpp20_complex.py b/tests/logitech_receiver/test_hidpp20_complex.py index b8dc5612..3b4632ef 100644 --- a/tests/logitech_receiver/test_hidpp20_complex.py +++ b/tests/logitech_receiver/test_hidpp20_complex.py @@ -288,9 +288,12 @@ def test_ReprogrammableKeyV4_set(responses, index, diverted, persistently_divert (fake_hidpp.responses_key, 3, 0x0053, 0x02, 0x0001, 0x00, 1, "Mouse Button: 1", "", "02000100", "7FFFFFFF"), ], ) -def test_RemappableAction(r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap, mocker): +def test_remappable_action(r, index, cid, actionId, remapped, mask, status, action, modifiers, byts, remap, mocker): if int(remap, 16) == special_keys.KEYS_Default: - responses = r + [fake_hidpp.Response("040000", 0x0000, "1C00"), fake_hidpp.Response("00", 0x450, f"{cid:04X}" + "FF")] + responses = r + [ + fake_hidpp.Response("040000", 0x0000, "1C00"), + fake_hidpp.Response("00", 0x450, f"{cid:04X}" + "FF"), + ] else: responses = r + [ fake_hidpp.Response("040000", 0x0000, "1C00"),