From c32f4181bb0090aed63205ad91357e9777ecc5a7 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 13:02:08 +1000 Subject: [PATCH 1/6] initial stab at a zeroconf unified event stream --- pyproject.toml | 4 + src/powersensor_local/__init__.py | 10 +- src/powersensor_local/devices.py | 328 ++++++++++++++-------- src/powersensor_local/zc_events.py | 56 ++++ src/powersensor_local/zeroconf_devices.py | 327 +++++++++++++++++++++ 5 files changed, 598 insertions(+), 127 deletions(-) create mode 100644 src/powersensor_local/zc_events.py create mode 100644 src/powersensor_local/zeroconf_devices.py diff --git a/pyproject.toml b/pyproject.toml index b121353..c83b81a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,12 +25,16 @@ Issues = "https://github.com/DiUS/python-powersensor_local/issues" ps-events = "powersensor_local.events:app" ps-rawplug = "powersensor_local.rawplug:app" ps-plugevents = "powersensor_local.plugevents:app" +ps-zcevents = "powersensor_local.zc_events:app" [build-system] requires = [ "hatchling" ] build-backend = "hatchling.build" [project.optional-dependencies] +zeroconf = [ + "zeroconf>=0.38.0", +] docs = [ "sphinx>=7.0.0", "sphinx-rtd-theme>=1.3.0", diff --git a/src/powersensor_local/__init__.py b/src/powersensor_local/__init__.py index 3b57563..f10a99d 100644 --- a/src/powersensor_local/__init__.py +++ b/src/powersensor_local/__init__.py @@ -39,12 +39,16 @@ 'PlugApi', '__version__', 'PlugListenerTcp', - 'PlugListenerUdp' + 'PlugListenerUdp', + 'PowersensorDevices', + 'PowersensorLegacyDevices', + 'PowersensorZeroconfDevices', ] -__version__ = "2.1.2" -from .devices import PowersensorDevices +__version__ = "2.1.3" +from .devices import PowersensorDevices, PowersensorLegacyDevices from .legacy_discovery import LegacyDiscovery from .plug_api import PlugApi from .plug_listener_tcp import PlugListenerTcp from .plug_listener_udp import PlugListenerUdp from .virtual_household import VirtualHousehold +from .zeroconf_devices import PowersensorZeroconfDevices diff --git a/src/powersensor_local/devices.py b/src/powersensor_local/devices.py index 4093ee9..56c84a2 100644 --- a/src/powersensor_local/devices.py +++ b/src/powersensor_local/devices.py @@ -1,4 +1,4 @@ -"""Abstraction interface for unified event stream from Powersensor devices""" +"""Abstraction interface for unified event stream from Powersensor devices.""" import asyncio import sys @@ -15,91 +15,117 @@ EXPIRY_CHECK_INTERVAL_S = 30 EXPIRY_TIMEOUT_S = 5 * 60 -class PowersensorDevices: - """Abstraction interface for the unified event stream from all Powersensor - devices on the local network. +_KNOWN_PLUG_EVENTS = [ + 'average_flow', + 'average_power', + 'average_power_components', + 'battery_level', + 'exception', + 'now_relaying_for', + 'radio_signal_quality', + 'summation_energy', + 'summation_volume', +] + + +class _PowersensorDevicesBase: + """Shared base for PowersensorLegacyDevices and PowersensorZeroconfDevices. + + Manages the PlugApi lifecycle, device tracking, expiry, and the event + callback. Subclasses are responsible for discovery — they call + _plug_discovered(mac, ip, port) when a plug appears and + _plug_lost(mac) when one disappears. """ - def __init__(self, bcast_addr=''): - """Creates a fresh instance, without scanning for devices.""" + def __init__(self, relay_now_relaying_for: bool = False) -> None: + """Initialise the base. + + Parameters + ---------- + relay_now_relaying_for: + When False (default), ``now_relaying_for`` messages are consumed + internally to synthesise ``device_found`` / ``device_lost`` events, + matching the behaviour of the original PowersensorDevices class. + When True the raw ``now_relaying_for`` event is forwarded to the + caller's callback unchanged, in addition to any ``device_found`` + synthesis. Set this to True when the caller wants to inspect relay + metadata directly (e.g. the HA dispatcher). + """ self._event_cb = None - self._discovery = LegacyDiscovery(bcast_addr) - self._devices = {} - self._timer = None - self._plug_apis = {} - - async def start(self, async_event_cb): - """Registers the async event callback function and starts the scan - of the local network to discover present devices. The callback is - of the form + self._devices: dict[str, '_PowersensorDevicesBase._Device'] = {} + self._plug_apis: dict[str, PlugApi] = {} + self._timer: '_PowersensorDevicesBase._Timer | None' = None + self._relay_now_relaying_for = relay_now_relaying_for - async def yourcallback(event: dict) -> None - - Known events: - - **scan_complete** - Indicates the discovery of Powersensor devices has completed. - Emitted in response to start() and rescan() calls. - The number of found gateways (plugs) is reported. + # ------------------------------------------------------------------ + # Public subscription API + # ------------------------------------------------------------------ - { event: "scan_complete", gateway_count: N } - - **device_found** - A new device found on the network. - The order found devices are announced is not fixed. - - { event: "device_found", - device_type: "plug" or "sensor", - mac: "...", - } + def subscribe(self, mac: str) -> None: + """Subscribe to events from the device with the given MAC address.""" + device = self._devices.get(mac) + if device: + device.subscribed = True - **device_lost** - A device appears to no longer be present on the network. + def unsubscribe(self, mac: str) -> None: + """Unsubscribe from events from the given MAC address.""" + device = self._devices.get(mac) + if device: + device.subscribed = False - { event: "device_lost", mac: "..." } + # ------------------------------------------------------------------ + # Teardown + # ------------------------------------------------------------------ - Additionally, all events described in xlatemsg.translate_raw_message - may be issued. The event name is inserted into the field 'event'. + async def stop(self) -> None: + """Stop event streaming and disconnect from all devices. - The start function returns the number of found gateway plugs. - Powersensor devices aren't found directly as they are typically not - on the network, but are instead detected when they relay data through - a plug via long-range radio. + To restart, call start() (legacy) or add_plug() (zeroconf) again. """ - self._event_cb = async_event_cb - await self._on_scanned(await self._discovery.scan()) - self._timer = self._Timer(EXPIRY_CHECK_INTERVAL_S, self._on_timer) - return len(self._plug_apis) - - async def rescan(self): - """Performs a fresh scan of the network to discover added devices, - or devices which have changed their IP address for some reason.""" - await self._on_scanned(await self._discovery.scan()) - - async def stop(self): - """Stops the event streaming and disconnects from the devices. - To restart the event streaming, call start() again.""" - for plug in self._plug_apis.values(): + for plug in list(self._plug_apis.values()): await plug.disconnect() - self._plug_apis = {} + self._plug_apis.clear() self._event_cb = None if self._timer: self._timer.terminate() self._timer = None - def subscribe(self, mac): - """Subscribes to events from the device with the given MAC address.""" - device = self._devices.get(mac) - if device: - device.subscribed = True + # ------------------------------------------------------------------ + # Called by subclasses when discovery reports changes + # ------------------------------------------------------------------ - def unsubscribe(self, mac): - """Unsubscribes from events from the given MAC address.""" - device = self._devices.get(mac) - if device: - device.subscribed = False + async def _plug_discovered(self, mac: str, ip: str, port: int) -> None: + """Called by the subclass when a plug is found or updated. - async def _emit_if_subscribed(self, ev, obj): + Creates a PlugApi for the plug if one doesn't exist yet, or + reconnects with a new address if the IP/port has changed. + """ + if mac in self._plug_apis: + api = self._plug_apis[mac] + if api.ip_address == ip and api.port == port: + return # no change + # Address changed — disconnect stale connection and reconnect. + await api.disconnect() + self._plug_apis.pop(mac) + + await self._add_device(mac, 'plug') + api = PlugApi(mac, ip, port) + self._plug_apis[mac] = api + for event in _KNOWN_PLUG_EVENTS: + api.subscribe(event, self._reemit) + api.connect() + + async def _plug_lost(self, mac: str) -> None: + """Called by the subclass when a plug has definitively disappeared.""" + if mac in self._plug_apis: + await self._plug_apis.pop(mac).disconnect() + await self._remove_device(mac) + + # ------------------------------------------------------------------ + # Internal event routing + # ------------------------------------------------------------------ + + async def _emit_if_subscribed(self, ev: str, obj: dict) -> None: if self._event_cb is None: return device = self._devices.get(obj.get('mac')) @@ -107,96 +133,150 @@ async def _emit_if_subscribed(self, ev, obj): obj['event'] = ev await self._event_cb(obj) - async def _reemit(self, ev, obj): - mac = obj['mac'] + async def _reemit(self, ev: str, obj: dict) -> None: + mac = obj.get('mac') device = self._devices.get(mac) if device is not None: device.mark_active() if ev == 'now_relaying_for': await self._add_device(mac, 'sensor') + if self._relay_now_relaying_for and self._event_cb is not None: + obj['event'] = ev + await self._event_cb(obj) else: await self._emit_if_subscribed(ev, obj) - async def _on_scanned(self, found): - for device in found: - mac = device['id'] - ip = device['ip'] - if not mac in self._devices: - await self._add_device(mac, 'plug') - api = PlugApi(mac, ip) - self._plug_apis[mac] = api - api.subscribe('average_flow', self._reemit) - api.subscribe('average_power', self._reemit) - api.subscribe('average_power_components', self._reemit) - api.subscribe('battery_level', self._reemit) - api.subscribe('exception', self._reemit) - api.subscribe('now_relaying_for', self._reemit) - api.subscribe('radio_signal_quality', self._reemit) - api.subscribe('summation_energy', self._reemit) - api.subscribe('summation_volume', self._reemit) - api.connect() - - await self._event_cb({ - 'event': 'scan_complete', - 'gateway_count': len(found), - }) - - async def _on_timer(self): - devices = list(self._devices.values()) - for device in devices: - if device.has_expired(): - await self._remove_device(device.mac) - - async def _add_device(self, mac, typ): + async def _add_device(self, mac: str, typ: str) -> None: if mac in self._devices: return self._devices[mac] = self._Device(mac) - await self._event_cb({ - 'event': 'device_found', - 'mac': mac, - 'device_type': typ, - }) - - async def _remove_device(self, mac): - if mac in self._devices: - self._devices.pop(mac) + if self._event_cb is not None: await self._event_cb({ - 'event': 'device_lost', + 'event': 'device_found', 'mac': mac, + 'device_type': typ, }) - ### Supporting classes ### + async def _remove_device(self, mac: str) -> None: + if mac in self._devices: + self._devices.pop(mac) + if self._event_cb is not None: + await self._event_cb({ + 'event': 'device_lost', + 'mac': mac, + }) + + async def _on_timer(self) -> None: + for device in list(self._devices.values()): + if device.has_expired(): + await self._remove_device(device.mac) + + def _start_expiry_timer(self) -> None: + self._timer = self._Timer(EXPIRY_CHECK_INTERVAL_S, self._on_timer) + + # ------------------------------------------------------------------ + # Supporting inner classes + # ------------------------------------------------------------------ class _Device: - def __init__(self, mac): + def __init__(self, mac: str) -> None: self.mac = mac self.subscribed = False self._last_active = datetime.now(timezone.utc) - def mark_active(self): - """Updates the last activity time to prevent expiry.""" + def mark_active(self) -> None: + """Update the last activity timestamp to prevent expiry.""" self._last_active = datetime.now(timezone.utc) - def has_expired(self): - """Checks whether the last activity time is past the expiry.""" - now = datetime.now(timezone.utc) - delta = now - self._last_active + def has_expired(self) -> bool: + """Return True if last activity is past the expiry window.""" + delta = datetime.now(timezone.utc) - self._last_active return delta.total_seconds() > EXPIRY_TIMEOUT_S - class _Timer: # pylint: disable=R0903 - def __init__(self, interval_s, callback): + class _Timer: + def __init__(self, interval_s: float, callback) -> None: self._terminate = False self._interval = interval_s self._callback = callback self._task = asyncio.create_task(self._run()) - def terminate(self): - """Disables the timer and cancels the associated task.""" + def terminate(self) -> None: + """Cancel the timer task.""" self._terminate = True self._task.cancel() - async def _run(self): + async def _run(self) -> None: while not self._terminate: await asyncio.sleep(self._interval) await self._callback() + + +class PowersensorLegacyDevices(_PowersensorDevicesBase): + """Abstraction interface for the unified event stream from all Powersensor + devices on the local network, using the legacy broadcast UDP discovery. + + This is the original PowersensorDevices implementation, renamed to make + room for PowersensorZeroconfDevices. The name PowersensorDevices is kept + as an alias for backwards compatibility. + """ + + def __init__( + self, + bcast_addr: str = '', + relay_now_relaying_for: bool = False, + ) -> None: + """Create a fresh instance, without scanning for devices.""" + super().__init__(relay_now_relaying_for=relay_now_relaying_for) + self._discovery = LegacyDiscovery(bcast_addr) + + async def start(self, async_event_cb) -> int: + """Register the async event callback and scan the local network. + + The callback has the form:: + + async def yourcallback(event: dict) -> None + + Known lifecycle events emitted: + + **scan_complete** + Indicates discovery has completed. + ``{ event: "scan_complete", gateway_count: N }`` + + **device_found** + A new device found on the network. + ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` + + **device_lost** + A device appears to no longer be present. + ``{ event: "device_lost", mac: "..." }`` + + Additionally all events from xlatemsg.translate_raw_message may be + issued, with the event name inserted into the ``event`` field. + + Returns the number of gateway plugs found. + """ + self._event_cb = async_event_cb + await self._on_scanned(await self._discovery.scan()) + self._start_expiry_timer() + return len(self._plug_apis) + + async def rescan(self) -> None: + """Perform a fresh scan to discover added or moved devices.""" + await self._on_scanned(await self._discovery.scan()) + + async def _on_scanned(self, found: list) -> None: + for device in found: + mac = device['id'] + ip = device['ip'] + await self._plug_discovered(mac, ip, 49476) + + if self._event_cb is not None: + await self._event_cb({ + 'event': 'scan_complete', + 'gateway_count': len(self._plug_apis), + }) + + +# Backwards-compatible alias. +PowersensorDevices = PowersensorLegacyDevices diff --git a/src/powersensor_local/zc_events.py b/src/powersensor_local/zc_events.py new file mode 100644 index 0000000..b7bfdf3 --- /dev/null +++ b/src/powersensor_local/zc_events.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +"""Firehose event stream from all Powersensor devices via mDNS/zeroconf discovery. + +Intended for debugging use only. Please use the proper interface in +zeroconf_devices.py rather than parsing the output from this script. + +All events from all discovered plugs and their relayed sensors are printed +to stdout as they arrive. Every device is subscribed to automatically. + +Requires the zeroconf extra:: + + pip install powersensor-local[zeroconf] +""" +import sys +from pathlib import Path + +PROJECT_ROOT = str(Path(__file__).parents[1]) +if PROJECT_ROOT not in sys.path: + sys.path.append(PROJECT_ROOT) + +# pylint: disable=C0413 +from powersensor_local.abstract_event_handler import AbstractEventHandler +from powersensor_local.zeroconf_devices import PowersensorZeroconfDevices + + +class ZcEventLoopRunner(AbstractEventHandler): + """Main logic wrapper.""" + + def __init__(self) -> None: + self.devices: PowersensorZeroconfDevices = PowersensorZeroconfDevices( + relay_now_relaying_for=True, + ) + + async def on_exit(self) -> None: + await self.devices.stop() + + async def on_message(self, obj: dict) -> None: + """Print every event and subscribe to any newly discovered device.""" + print(obj) + if obj['event'] == 'device_found': + self.devices.subscribe(obj['mac']) + + async def main(self) -> None: + self.register_sigint_handler() + await self.devices.start(self.on_message) + await self.wait() + + +def app() -> None: + """Application entry point.""" + ZcEventLoopRunner().run() + + +if __name__ == '__main__': + app() diff --git a/src/powersensor_local/zeroconf_devices.py b/src/powersensor_local/zeroconf_devices.py new file mode 100644 index 0000000..31ac2f1 --- /dev/null +++ b/src/powersensor_local/zeroconf_devices.py @@ -0,0 +1,327 @@ +"""Zeroconf/mDNS-based discovery for Powersensor devices. + +This module provides PowersensorZeroconfDevices, which uses continuous mDNS +browsing to discover Powersensor plugs rather than the legacy one-shot UDP +broadcast in PowersensorLegacyDevices. + +The zeroconf package is an optional dependency. Install it via:: + + pip install powersensor-local[zeroconf] + +Architecture +------------ +PowersensorZeroconfDevices owns the full lifecycle: + +- It starts a zeroconf ServiceBrowser that calls back on plug add/update/remove. +- Plug removals are debounced (default 60 s) to absorb transient disappearances + such as reboots or DHCP renewals. +- The public add_plug() / remove_plug() methods are the seam between discovery + and the plug API lifecycle, and can also be called directly (e.g. from a + test or from HA's own mDNS handler) without needing a real zeroconf instance. + +Zeroconf instance ownership +---------------------------- +If ``zeroconf_instance`` is None (the default), the class creates and owns a +Zeroconf instance and closes it in stop(). If a Zeroconf instance is passed +in, the caller owns it and the class will not close it — this is the correct +pattern for Home Assistant, which maintains a single shared zeroconf instance. + +Thread safety +------------- +``_Listener`` is called from the zeroconf background thread. It maintains its +own ``_name_to_mac`` cache (populated on add/update, consumed on remove) so +that ``remove_service`` can resolve the MAC without touching the zeroconf +record, which may already be gone by the time the callback fires. + +All HA-facing work crosses the thread boundary via +``loop.call_soon_threadsafe``. The ``_Listener`` dict is only ever read and +written from the zeroconf thread, so no additional locking is needed. +""" +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path +from typing import Any + +PROJECT_ROOT = str(Path(__file__).parents[1]) +if PROJECT_ROOT not in sys.path: + sys.path.append(PROJECT_ROOT) + +# pylint: disable=C0413 +from powersensor_local.devices import _PowersensorDevicesBase + +_LOGGER = logging.getLogger(__name__) + +_SERVICE_TYPE_UDP = '_powersensor._udp.local.' +_SERVICE_TYPE_TCP = '_powersensor._tcp.local.' + +_DEBOUNCE_DEFAULT_S = 60.0 + + +def _require_zeroconf(): + """Import zeroconf, raising a clear error if it is not installed.""" + try: + import zeroconf as _zc # noqa: PLC0415 + return _zc + except ImportError as exc: + raise ImportError( + "The 'zeroconf' package is required for PowersensorZeroconfDevices. " + "Install it with: pip install powersensor-local[zeroconf]" + ) from exc + + +class PowersensorZeroconfDevices(_PowersensorDevicesBase): + """Discovers and manages Powersensor plugs via continuous mDNS browsing. + + Usage example (no existing zeroconf instance):: + + devices = PowersensorZeroconfDevices() + await devices.start(my_callback) + # plugs arrive via my_callback as device_found events + # ... + await devices.stop() + + Usage example (HA — pass the shared zeroconf instance):: + + zc = await homeassistant.components.zeroconf.async_get_instance(hass) + devices = PowersensorZeroconfDevices(zeroconf_instance=zc) + await devices.start(my_callback) + + The callback signature is the same as PowersensorLegacyDevices.start(): + ``async def callback(event: dict) -> None`` + + Lifecycle events emitted: + + **device_found** + ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` + + **device_lost** + ``{ event: "device_lost", mac: "..." }`` + + Note: ``scan_complete`` is NOT emitted — mDNS browsing is continuous and + has no defined completion point. + """ + + def __init__( + self, + zeroconf_instance: Any = None, + service_type: str = _SERVICE_TYPE_UDP, + debounce_timeout: float = _DEBOUNCE_DEFAULT_S, + relay_now_relaying_for: bool = False, + ) -> None: + """Initialise. + + Parameters + ---------- + zeroconf_instance: + An existing ``Zeroconf`` instance to use. If None, one is created + and owned by this object (and closed in stop()). + service_type: + The mDNS service type to browse. Defaults to the UDP service + ``_powersensor._udp.local.``; pass ``_powersensor._tcp.local.`` + to use TCP transport instead. + debounce_timeout: + Seconds to wait after a ``remove_service`` callback before treating + the plug as truly gone. Defaults to 60 s. + relay_now_relaying_for: + See _PowersensorDevicesBase for documentation. + """ + super().__init__(relay_now_relaying_for=relay_now_relaying_for) + self._zc_instance = zeroconf_instance + self._zc_owned = zeroconf_instance is None # True → we close it in stop() + self._service_type = service_type + self._debounce_seconds = debounce_timeout + self._browser: Any = None + self._listener: _Listener | None = None + self._pending_removals: dict[str, asyncio.TimerHandle] = {} + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self, async_event_cb) -> None: + """Register the event callback and start the mDNS service browser. + + The browser is event-driven; no polling loop is started here. + Plugs already present on the network will trigger add_service + callbacks shortly after the browser starts. + """ + zc_mod = _require_zeroconf() + self._event_cb = async_event_cb + self._start_expiry_timer() + + if self._zc_instance is None: + self._zc_instance = zc_mod.Zeroconf() + + loop = asyncio.get_running_loop() + self._listener = _Listener(self, loop) + self._browser = zc_mod.ServiceBrowser( + self._zc_instance, self._service_type, self._listener + ) + + async def stop(self) -> None: + """Stop browsing, cancel pending removals, and disconnect all plugs.""" + for handle in list(self._pending_removals.values()): + handle.cancel() + self._pending_removals.clear() + + if self._browser is not None: + self._browser.cancel() + self._browser = None + + self._listener = None + + if self._zc_owned and self._zc_instance is not None: + self._zc_instance.close() + self._zc_instance = None + + await super().stop() + + # ------------------------------------------------------------------ + # Public discovery seam — may also be called directly + # ------------------------------------------------------------------ + + def add_plug(self, mac: str, ip: str, port: int) -> None: + """Notify that a plug is present at the given address. + + Creates or reconnects the PlugApi for this plug. Safe to call + directly without a zeroconf browser (e.g. from tests, or from HA's + own mDNS handler). Cancels any pending debounced removal for this MAC. + + Must be called from the event loop thread. + """ + self._cancel_pending_removal(mac, source='add_plug') + asyncio.get_running_loop().create_task( + self._plug_discovered(mac, ip, port) + ) + + def remove_plug(self, mac: str) -> None: + """Schedule a debounced removal for the given plug. + + After ``debounce_timeout`` seconds with no re-announcement, the plug + API is disconnected and a ``device_lost`` event is emitted. + + Must be called from the event loop thread. + """ + self._schedule_removal(mac) + + # ------------------------------------------------------------------ + # Debounce helpers (event-loop side only) + # ------------------------------------------------------------------ + + def _schedule_removal(self, mac: str) -> None: + if mac in self._pending_removals: + return + loop = asyncio.get_running_loop() + handle = loop.call_later( + self._debounce_seconds, + self._on_debounce_expired, + mac, + ) + self._pending_removals[mac] = handle + _LOGGER.debug("Scheduled removal for %s in %.0f s", mac, self._debounce_seconds) + + def _on_debounce_expired(self, mac: str) -> None: + """Called by the event loop when the debounce timer fires.""" + self._pending_removals.pop(mac, None) + _LOGGER.info("Plug %s still absent after debounce — removing", mac) + asyncio.get_running_loop().create_task(self._plug_lost(mac)) + + def _cancel_pending_removal(self, mac: str, source: str) -> None: + handle = self._pending_removals.pop(mac, None) + if handle: + handle.cancel() + _LOGGER.debug("Cancelled pending removal for %s (%s)", mac, source) + + # ------------------------------------------------------------------ + # Called from _Listener (zeroconf thread → event loop via stored loop ref) + # ------------------------------------------------------------------ + + def _on_zc_add(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf add') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) + ) + + def _on_zc_update(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf update') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) + ) + + def _on_zc_remove(self, mac: str, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._schedule_removal, mac) + + +class _Listener: + """Zeroconf ServiceListener that forwards events to PowersensorZeroconfDevices. + + Internal implementation detail. All ServiceListener callbacks arrive on + the zeroconf background thread. + + Thread safety + ------------- + ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and + consumed in ``remove_service``, all of which are called from the same + zeroconf background thread — no locking required. + + The stored ``_loop`` reference is captured once at construction from the + running asyncio event loop, and is used (read-only) from the zeroconf + thread to schedule work back onto that loop via ``call_soon_threadsafe``. + """ + + def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEventLoop) -> None: + self._owner = owner + self._loop = loop + self._name_to_mac: dict[str, str] = {} + + def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | None: + """Return (mac, ip, port) from the zeroconf service record, or None.""" + try: + info = zc.get_service_info(type_, name) + except Exception as err: # pylint: disable=broad-except + _LOGGER.error("Error retrieving zeroconf info for %s: %s", name, err) + return None + + if not info: + return None + + addresses = info.parsed_addresses() + if not addresses: + _LOGGER.warning("No addresses in zeroconf record for %s", name) + return None + + try: + mac = info.properties[b'id'].decode('utf-8') + except (KeyError, AttributeError) as err: + _LOGGER.error( + "Missing 'id' property in zeroconf record for %s: %s", name, err + ) + return None + + return mac, addresses[0], info.port + + def add_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result: + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_add(mac, ip, port, self._loop) + + def update_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result: + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_update(mac, ip, port, self._loop) + + def remove_service(self, zc: Any, type_: str, name: str) -> None: + mac = self._name_to_mac.pop(name, None) + if mac is None: + _LOGGER.warning( + "remove_service for %s: MAC not in cache — removal ignored", name + ) + return + self._owner._on_zc_remove(mac, self._loop) From 9c5d46bb7257d68f370403b6fda279253cc68308 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 13:04:02 +1000 Subject: [PATCH 2/6] add zeroconf to requirements.test.txt --- requirements.test.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.test.txt b/requirements.test.txt index b08591f..8fd6ba3 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -1,3 +1,4 @@ mypy pytest pytest-coverage +zeroconf \ No newline at end of file From 1d58f75f96e9d37cee6173d61084eed867cc76d8 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 13:10:30 +1000 Subject: [PATCH 3/6] make mypy checks pass --- src/powersensor_local/devices.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/powersensor_local/devices.py b/src/powersensor_local/devices.py index 56c84a2..8656fc9 100644 --- a/src/powersensor_local/devices.py +++ b/src/powersensor_local/devices.py @@ -133,8 +133,13 @@ async def _emit_if_subscribed(self, ev: str, obj: dict) -> None: obj['event'] = ev await self._event_cb(obj) - async def _reemit(self, ev: str, obj: dict) -> None: - mac = obj.get('mac') + async def _reemit(self, ev: str, obj: dict[str, str]) -> None: + mac: str|None = obj.get('mac') + if mac is None: + # we don't log anything in this library, but if we did perhaps + # _LOGGER.warning("Received event '%s' with no MAC address -- ignoring", ev) might be appropriate + # for now...silence + return device = self._devices.get(mac) if device is not None: device.mark_active() From a3f4b87505904b765232b8d4dcbcefa871eef519 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 14:21:57 +1000 Subject: [PATCH 4/6] simplify import error logic --- README.md | 2 +- src/powersensor_local/devices.py | 6 +- src/powersensor_local/zeroconf_devices.py | 504 +++++++++++----------- 3 files changed, 259 insertions(+), 253 deletions(-) mode change 100644 => 100755 src/powersensor_local/devices.py mode change 100644 => 100755 src/powersensor_local/zeroconf_devices.py diff --git a/README.md b/README.md index 98c8f4d..53d0e35 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ relies on the PlugApi as well. There are also some small utilities included,`ps-plugevents` and `ps-rawplug` showcasing the use of the first interface approach, and `ps-events` the latter. . -The `ps-events` is effectively a consumer of the the PowersensorDevices event +The `ps-events` is effectively a consumer the PowersensorDevices event stream and dumps all events to standard out. Similary, `ps-plugevents` shows the event stream from a single plug (plus whatever it might be relaying for), and `ps-rawplug` shows the raw event stream from the plug. Note that the format diff --git a/src/powersensor_local/devices.py b/src/powersensor_local/devices.py old mode 100644 new mode 100755 index 8656fc9..d0f811a --- a/src/powersensor_local/devices.py +++ b/src/powersensor_local/devices.py @@ -125,10 +125,10 @@ async def _plug_lost(self, mac: str) -> None: # Internal event routing # ------------------------------------------------------------------ - async def _emit_if_subscribed(self, ev: str, obj: dict) -> None: + async def _emit_if_subscribed(self, ev: str, mac: str, obj: dict) -> None: if self._event_cb is None: return - device = self._devices.get(obj.get('mac')) + device = self._devices.get(mac) if device is not None and device.subscribed: obj['event'] = ev await self._event_cb(obj) @@ -150,7 +150,7 @@ async def _reemit(self, ev: str, obj: dict[str, str]) -> None: obj['event'] = ev await self._event_cb(obj) else: - await self._emit_if_subscribed(ev, obj) + await self._emit_if_subscribed(ev, mac, obj) async def _add_device(self, mac: str, typ: str) -> None: if mac in self._devices: diff --git a/src/powersensor_local/zeroconf_devices.py b/src/powersensor_local/zeroconf_devices.py old mode 100644 new mode 100755 index 31ac2f1..90885fd --- a/src/powersensor_local/zeroconf_devices.py +++ b/src/powersensor_local/zeroconf_devices.py @@ -49,7 +49,6 @@ if PROJECT_ROOT not in sys.path: sys.path.append(PROJECT_ROOT) -# pylint: disable=C0413 from powersensor_local.devices import _PowersensorDevicesBase _LOGGER = logging.getLogger(__name__) @@ -60,268 +59,275 @@ _DEBOUNCE_DEFAULT_S = 60.0 -def _require_zeroconf(): - """Import zeroconf, raising a clear error if it is not installed.""" - try: - import zeroconf as _zc # noqa: PLC0415 - return _zc - except ImportError as exc: - raise ImportError( - "The 'zeroconf' package is required for PowersensorZeroconfDevices. " - "Install it with: pip install powersensor-local[zeroconf]" - ) from exc - - -class PowersensorZeroconfDevices(_PowersensorDevicesBase): - """Discovers and manages Powersensor plugs via continuous mDNS browsing. - - Usage example (no existing zeroconf instance):: - - devices = PowersensorZeroconfDevices() - await devices.start(my_callback) - # plugs arrive via my_callback as device_found events - # ... - await devices.stop() - - Usage example (HA — pass the shared zeroconf instance):: - - zc = await homeassistant.components.zeroconf.async_get_instance(hass) - devices = PowersensorZeroconfDevices(zeroconf_instance=zc) - await devices.start(my_callback) - - The callback signature is the same as PowersensorLegacyDevices.start(): - ``async def callback(event: dict) -> None`` - - Lifecycle events emitted: - - **device_found** - ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` - - **device_lost** - ``{ event: "device_lost", mac: "..." }`` - - Note: ``scan_complete`` is NOT emitted — mDNS browsing is continuous and - has no defined completion point. - """ - - def __init__( - self, - zeroconf_instance: Any = None, - service_type: str = _SERVICE_TYPE_UDP, - debounce_timeout: float = _DEBOUNCE_DEFAULT_S, - relay_now_relaying_for: bool = False, - ) -> None: - """Initialise. - - Parameters - ---------- - zeroconf_instance: - An existing ``Zeroconf`` instance to use. If None, one is created - and owned by this object (and closed in stop()). - service_type: - The mDNS service type to browse. Defaults to the UDP service - ``_powersensor._udp.local.``; pass ``_powersensor._tcp.local.`` - to use TCP transport instead. - debounce_timeout: - Seconds to wait after a ``remove_service`` callback before treating - the plug as truly gone. Defaults to 60 s. - relay_now_relaying_for: - See _PowersensorDevicesBase for documentation. - """ - super().__init__(relay_now_relaying_for=relay_now_relaying_for) - self._zc_instance = zeroconf_instance - self._zc_owned = zeroconf_instance is None # True → we close it in stop() - self._service_type = service_type - self._debounce_seconds = debounce_timeout - self._browser: Any = None - self._listener: _Listener | None = None - self._pending_removals: dict[str, asyncio.TimerHandle] = {} - - # ------------------------------------------------------------------ - # Lifecycle - # ------------------------------------------------------------------ - - async def start(self, async_event_cb) -> None: - """Register the event callback and start the mDNS service browser. - - The browser is event-driven; no polling loop is started here. - Plugs already present on the network will trigger add_service - callbacks shortly after the browser starts. +try: + import zeroconf as _zc + class PowersensorZeroconfDevices(_PowersensorDevicesBase): + """Discovers and manages Powersensor plugs via continuous mDNS browsing. + + Usage example (no existing zeroconf instance):: + + devices = PowersensorZeroconfDevices() + await devices.start(my_callback) + # plugs arrive via my_callback as device_found events + # ... + await devices.stop() + + Usage example (HA — pass the shared zeroconf instance):: + + zc = await homeassistant.components.zeroconf.async_get_instance(hass) + devices = PowersensorZeroconfDevices(zeroconf_instance=zc) + await devices.start(my_callback) + + The callback signature is the same as PowersensorLegacyDevices.start(): + ``async def callback(event: dict) -> None`` + + Lifecycle events emitted: + + **device_found** + ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` + + **device_lost** + ``{ event: "device_lost", mac: "..." }`` + + Note: ``scan_complete`` is NOT emitted — mDNS browsing is continuous and + has no defined completion point. """ - zc_mod = _require_zeroconf() - self._event_cb = async_event_cb - self._start_expiry_timer() - if self._zc_instance is None: - self._zc_instance = zc_mod.Zeroconf() + def __init__( + self, + zeroconf_instance: Any = None, + service_type: str = _SERVICE_TYPE_UDP, + debounce_timeout: float = _DEBOUNCE_DEFAULT_S, + relay_now_relaying_for: bool = False, + ) -> None: + """Initialise. + + Parameters + ---------- + zeroconf_instance: + An existing ``Zeroconf`` instance to use. If None, one is created + and owned by this object (and closed in stop()). + service_type: + The mDNS service type to browse. Defaults to the UDP service + ``_powersensor._udp.local.``; pass ``_powersensor._tcp.local.`` + to use TCP transport instead. + debounce_timeout: + Seconds to wait after a ``remove_service`` callback before treating + the plug as truly gone. Defaults to 60 s. + relay_now_relaying_for: + See _PowersensorDevicesBase for documentation. + """ + super().__init__(relay_now_relaying_for=relay_now_relaying_for) + self._zc_instance = zeroconf_instance + self._zc_owned = zeroconf_instance is None # True → we close it in stop() + self._service_type = service_type + self._debounce_seconds = debounce_timeout + self._browser: Any = None + self._listener: _Listener | None = None + self._pending_removals: dict[str, asyncio.TimerHandle] = {} + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self, async_event_cb) -> None: + """Register the event callback and start the mDNS service browser. + + The browser is event-driven; no polling loop is started here. + Plugs already present on the network will trigger add_service + callbacks shortly after the browser starts. + """ + self._event_cb = async_event_cb + self._start_expiry_timer() + + if self._zc_instance is None: + self._zc_instance = _zc.Zeroconf() + + loop = asyncio.get_running_loop() + self._listener = _Listener(self, loop) + self._browser = _zc.ServiceBrowser( + self._zc_instance, self._service_type, self._listener + ) - loop = asyncio.get_running_loop() - self._listener = _Listener(self, loop) - self._browser = zc_mod.ServiceBrowser( - self._zc_instance, self._service_type, self._listener - ) + async def stop(self) -> None: + """Stop browsing, cancel pending removals, and disconnect all plugs.""" + for handle in list(self._pending_removals.values()): + handle.cancel() + self._pending_removals.clear() - async def stop(self) -> None: - """Stop browsing, cancel pending removals, and disconnect all plugs.""" - for handle in list(self._pending_removals.values()): - handle.cancel() - self._pending_removals.clear() + if self._browser is not None: + self._browser.cancel() + self._browser = None - if self._browser is not None: - self._browser.cancel() - self._browser = None + self._listener = None - self._listener = None + if self._zc_owned and self._zc_instance is not None: + self._zc_instance.close() + self._zc_instance = None - if self._zc_owned and self._zc_instance is not None: - self._zc_instance.close() - self._zc_instance = None + await super().stop() - await super().stop() + # ------------------------------------------------------------------ + # Public discovery seam — may also be called directly + # ------------------------------------------------------------------ - # ------------------------------------------------------------------ - # Public discovery seam — may also be called directly - # ------------------------------------------------------------------ + def add_plug(self, mac: str, ip: str, port: int) -> None: + """Notify that a plug is present at the given address. - def add_plug(self, mac: str, ip: str, port: int) -> None: - """Notify that a plug is present at the given address. + Creates or reconnects the PlugApi for this plug. Safe to call + directly without a zeroconf browser (e.g. from tests, or from HA's + own mDNS handler). Cancels any pending debounced removal for this MAC. - Creates or reconnects the PlugApi for this plug. Safe to call - directly without a zeroconf browser (e.g. from tests, or from HA's - own mDNS handler). Cancels any pending debounced removal for this MAC. + Must be called from the event loop thread. + """ + self._cancel_pending_removal(mac, source='add_plug') + asyncio.get_running_loop().create_task( + self._plug_discovered(mac, ip, port) + ) - Must be called from the event loop thread. - """ - self._cancel_pending_removal(mac, source='add_plug') - asyncio.get_running_loop().create_task( - self._plug_discovered(mac, ip, port) - ) + def remove_plug(self, mac: str) -> None: + """Schedule a debounced removal for the given plug. - def remove_plug(self, mac: str) -> None: - """Schedule a debounced removal for the given plug. + After ``debounce_timeout`` seconds with no re-announcement, the plug + API is disconnected and a ``device_lost`` event is emitted. - After ``debounce_timeout`` seconds with no re-announcement, the plug - API is disconnected and a ``device_lost`` event is emitted. + Must be called from the event loop thread. + """ + self._schedule_removal(mac) - Must be called from the event loop thread. - """ - self._schedule_removal(mac) - - # ------------------------------------------------------------------ - # Debounce helpers (event-loop side only) - # ------------------------------------------------------------------ - - def _schedule_removal(self, mac: str) -> None: - if mac in self._pending_removals: - return - loop = asyncio.get_running_loop() - handle = loop.call_later( - self._debounce_seconds, - self._on_debounce_expired, - mac, - ) - self._pending_removals[mac] = handle - _LOGGER.debug("Scheduled removal for %s in %.0f s", mac, self._debounce_seconds) - - def _on_debounce_expired(self, mac: str) -> None: - """Called by the event loop when the debounce timer fires.""" - self._pending_removals.pop(mac, None) - _LOGGER.info("Plug %s still absent after debounce — removing", mac) - asyncio.get_running_loop().create_task(self._plug_lost(mac)) - - def _cancel_pending_removal(self, mac: str, source: str) -> None: - handle = self._pending_removals.pop(mac, None) - if handle: - handle.cancel() - _LOGGER.debug("Cancelled pending removal for %s (%s)", mac, source) - - # ------------------------------------------------------------------ - # Called from _Listener (zeroconf thread → event loop via stored loop ref) - # ------------------------------------------------------------------ - - def _on_zc_add(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: - loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf add') - loop.call_soon_threadsafe( - lambda: loop.create_task(self._plug_discovered(mac, ip, port)) - ) - - def _on_zc_update(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: - loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf update') - loop.call_soon_threadsafe( - lambda: loop.create_task(self._plug_discovered(mac, ip, port)) - ) - - def _on_zc_remove(self, mac: str, loop: asyncio.AbstractEventLoop) -> None: - loop.call_soon_threadsafe(self._schedule_removal, mac) - - -class _Listener: - """Zeroconf ServiceListener that forwards events to PowersensorZeroconfDevices. - - Internal implementation detail. All ServiceListener callbacks arrive on - the zeroconf background thread. - - Thread safety - ------------- - ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and - consumed in ``remove_service``, all of which are called from the same - zeroconf background thread — no locking required. - - The stored ``_loop`` reference is captured once at construction from the - running asyncio event loop, and is used (read-only) from the zeroconf - thread to schedule work back onto that loop via ``call_soon_threadsafe``. - """ - - def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEventLoop) -> None: - self._owner = owner - self._loop = loop - self._name_to_mac: dict[str, str] = {} - - def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | None: - """Return (mac, ip, port) from the zeroconf service record, or None.""" - try: - info = zc.get_service_info(type_, name) - except Exception as err: # pylint: disable=broad-except - _LOGGER.error("Error retrieving zeroconf info for %s: %s", name, err) - return None - - if not info: - return None - - addresses = info.parsed_addresses() - if not addresses: - _LOGGER.warning("No addresses in zeroconf record for %s", name) - return None - - try: - mac = info.properties[b'id'].decode('utf-8') - except (KeyError, AttributeError) as err: - _LOGGER.error( - "Missing 'id' property in zeroconf record for %s: %s", name, err + # ------------------------------------------------------------------ + # Debounce helpers (event-loop side only) + # ------------------------------------------------------------------ + + def _schedule_removal(self, mac: str) -> None: + if mac in self._pending_removals: + return + loop = asyncio.get_running_loop() + handle = loop.call_later( + self._debounce_seconds, + self._on_debounce_expired, + mac, + ) + self._pending_removals[mac] = handle + _LOGGER.debug("Scheduled removal for %s in %.0f s", mac, self._debounce_seconds) + + def _on_debounce_expired(self, mac: str) -> None: + """Called by the event loop when the debounce timer fires.""" + self._pending_removals.pop(mac, None) + _LOGGER.info("Plug %s still absent after debounce — removing", mac) + asyncio.get_running_loop().create_task(self._plug_lost(mac)) + + def _cancel_pending_removal(self, mac: str, source: str) -> None: + handle = self._pending_removals.pop(mac, None) + if handle: + handle.cancel() + _LOGGER.debug("Cancelled pending removal for %s (%s)", mac, source) + + # ------------------------------------------------------------------ + # Called from _Listener (zeroconf thread → event loop via stored loop ref) + # ------------------------------------------------------------------ + + def _on_zc_add(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf add') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) ) - return None - - return mac, addresses[0], info.port - - def add_service(self, zc: Any, type_: str, name: str) -> None: - result = self._extract(zc, type_, name) - if result: - mac, ip, port = result - self._name_to_mac[name] = mac - self._owner._on_zc_add(mac, ip, port, self._loop) - - def update_service(self, zc: Any, type_: str, name: str) -> None: - result = self._extract(zc, type_, name) - if result: - mac, ip, port = result - self._name_to_mac[name] = mac - self._owner._on_zc_update(mac, ip, port, self._loop) - - def remove_service(self, zc: Any, type_: str, name: str) -> None: - mac = self._name_to_mac.pop(name, None) - if mac is None: - _LOGGER.warning( - "remove_service for %s: MAC not in cache — removal ignored", name + + def _on_zc_update(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf update') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) ) - return - self._owner._on_zc_remove(mac, self._loop) + + def _on_zc_remove(self, mac: str, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._schedule_removal, mac) + + + class _Listener(_zc.ServiceListener): + """Zeroconf ServiceListener that forwards events to PowersensorZeroconfDevices. + + Internal implementation detail. All ServiceListener callbacks arrive on + the zeroconf background thread. + + Thread safety + ------------- + ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and + consumed in ``remove_service``, all of which are called from the same + zeroconf background thread — no locking required. + + The stored ``_loop`` reference is captured once at construction from the + running asyncio event loop, and is used (read-only) from the zeroconf + thread to schedule work back onto that loop via ``call_soon_threadsafe``. + """ + + def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEventLoop) -> None: + self._owner = owner + self._loop = loop + self._name_to_mac: dict[str, str] = {} + + def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | None: + """Return (mac, ip, port) from the zeroconf service record, or None.""" + try: + info = zc.get_service_info(type_, name) + except Exception as err: # pylint: disable=broad-except + _LOGGER.error("Error retrieving zeroconf info for %s: %s", name, err) + return None + + if not info: + return None + + addresses = info.parsed_addresses() + if not addresses: + _LOGGER.warning("No addresses in zeroconf record for %s", name) + return None + + try: + mac = info.properties[b'id'].decode('utf-8') + except (KeyError, AttributeError) as err: + _LOGGER.error( + "Missing 'id' property in zeroconf record for %s: %s", name, err + ) + return None + + return mac, addresses[0], info.port + + def add_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result: + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_add(mac, ip, port, self._loop) + + def update_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result: + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_update(mac, ip, port, self._loop) + + def remove_service(self, zc: Any, type_: str, name: str) -> None: + mac = self._name_to_mac.pop(name, None) + if mac is None: + _LOGGER.warning( + "remove_service for %s: MAC not in cache — removal ignored", name + ) + return + self._owner._on_zc_remove(mac, self._loop) + +except ImportError as exc: + _zeroconf_import_error = exc + + class PowersensorZeroconfDevices(_PowersensorDevicesBase): # type: ignore[no-redef] + """Stub raised when the optional zeroconf package is not installed. + + To use mDNS-based discovery, install the zeroconf extra:: + + pip install powersensor-local[zeroconf] + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__() + raise ImportError( + "The 'zeroconf' package is required for PowersensorZeroconfDevices. " + "Install it with: pip install powersensor-local[zeroconf]" + ) from _zeroconf_import_error From f664b8d6a741d004190210f1e7f258f922e53b46 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 15:10:50 +1000 Subject: [PATCH 5/6] Fix silent dropping of plugs...hopefully... --- src/powersensor_local/zeroconf_devices.py | 100 +++++++++++++++------- 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/src/powersensor_local/zeroconf_devices.py b/src/powersensor_local/zeroconf_devices.py index 90885fd..23a026e 100755 --- a/src/powersensor_local/zeroconf_devices.py +++ b/src/powersensor_local/zeroconf_devices.py @@ -28,14 +28,29 @@ Thread safety ------------- -``_Listener`` is called from the zeroconf background thread. It maintains its -own ``_name_to_mac`` cache (populated on add/update, consumed on remove) so -that ``remove_service`` can resolve the MAC without touching the zeroconf -record, which may already be gone by the time the callback fires. - -All HA-facing work crosses the thread boundary via -``loop.call_soon_threadsafe``. The ``_Listener`` dict is only ever read and -written from the zeroconf thread, so no additional locking is needed. +On zeroconf >= 0.32 (including Home Assistant's 0.149.x), ServiceBrowser +callbacks run inside the asyncio event loop rather than on a background thread. +On older zeroconf (e.g. 1.0.0, which used a select() thread), they run on a +background thread. + +``_Listener`` is therefore written to be safe in both models: + +- ``_extract`` uses ``ServiceInfo.load_from_cache()`` rather than + ``Zeroconf.get_service_info()``. On >= 0.32, calling get_service_info() + from inside a ServiceBrowser callback deadlocks — it blocks waiting for a + DNS reply that can never arrive because it holds the event loop. + load_from_cache() is synchronous, non-blocking, and explicitly threadsafe; + the ServiceBrowser guarantees the cache is populated before firing the + callback, so the record is always present. + +- ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and + consumed in ``remove_service``. On >= 0.32 all three callbacks run on the + same event loop thread, so no locking is needed. On older versions they run + on the same zeroconf background thread, so no locking is needed there either. + +- All work that touches PowersensorZeroconfDevices state crosses the thread + boundary via ``loop.call_soon_threadsafe``, making it safe regardless of + which threading model the installed zeroconf uses. """ from __future__ import annotations @@ -247,17 +262,17 @@ class _Listener(_zc.ServiceListener): """Zeroconf ServiceListener that forwards events to PowersensorZeroconfDevices. Internal implementation detail. All ServiceListener callbacks arrive on - the zeroconf background thread. + the zeroconf event loop (>= 0.32) or background thread (< 0.32 / 1.0.0). Thread safety ------------- ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and - consumed in ``remove_service``, all of which are called from the same - zeroconf background thread — no locking required. + consumed in ``remove_service``. In both threading models all three + callbacks arrive on the same thread/loop, so no locking is required. The stored ``_loop`` reference is captured once at construction from the - running asyncio event loop, and is used (read-only) from the zeroconf - thread to schedule work back onto that loop via ``call_soon_threadsafe``. + running asyncio event loop, and is used (read-only) from the callback + context to schedule work back onto that loop via ``call_soon_threadsafe``. """ def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEventLoop) -> None: @@ -266,19 +281,34 @@ def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEven self._name_to_mac: dict[str, str] = {} def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | None: - """Return (mac, ip, port) from the zeroconf service record, or None.""" - try: - info = zc.get_service_info(type_, name) - except Exception as err: # pylint: disable=broad-except - _LOGGER.error("Error retrieving zeroconf info for %s: %s", name, err) - return None - - if not info: + """Return (mac, ip, port) from the zeroconf cache, or None. + + Uses ServiceInfo.load_from_cache() rather than + Zeroconf.get_service_info() for two reasons: + + 1. On zeroconf >= 0.32 (including HA's 0.149.x), ServiceBrowser + callbacks run inside the asyncio event loop. Calling + get_service_info() from there blocks waiting for a DNS reply + that can never be processed because the event loop is occupied — + it deadlocks, times out after 3 s, and silently returns None, + causing the device to be dropped. + + 2. load_from_cache() is synchronous, non-blocking, and explicitly + documented as threadsafe. The ServiceBrowser guarantees the + cache is populated before firing add_service / update_service, + so the record is always present when this method is called. + """ + info = _zc.ServiceInfo(type_, name) + if not info.load_from_cache(zc): + _LOGGER.warning( + "No cache entry for %s — device will appear on next mDNS announcement", + name, + ) return None addresses = info.parsed_addresses() if not addresses: - _LOGGER.warning("No addresses in zeroconf record for %s", name) + _LOGGER.warning("No addresses in zeroconf cache record for %s", name) return None try: @@ -293,17 +323,27 @@ def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | Non def add_service(self, zc: Any, type_: str, name: str) -> None: result = self._extract(zc, type_, name) - if result: - mac, ip, port = result - self._name_to_mac[name] = mac - self._owner._on_zc_add(mac, ip, port, self._loop) + if result is None: + _LOGGER.warning( + "add_service: no info available for %s — will retry on next announcement", + name, + ) + return + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_add(mac, ip, port, self._loop) def update_service(self, zc: Any, type_: str, name: str) -> None: result = self._extract(zc, type_, name) - if result: - mac, ip, port = result - self._name_to_mac[name] = mac - self._owner._on_zc_update(mac, ip, port, self._loop) + if result is None: + _LOGGER.warning( + "update_service: no info available for %s — will retry on next announcement", + name, + ) + return + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_update(mac, ip, port, self._loop) def remove_service(self, zc: Any, type_: str, name: str) -> None: mac = self._name_to_mac.pop(name, None) From b3c8a08f083d23f7f1e93c89bbca1a818894efb3 Mon Sep 17 00:00:00 2001 From: Lake Bookman Date: Thu, 4 Jun 2026 15:27:36 +1000 Subject: [PATCH 6/6] Fix silent dropping of plugs...hopefully... --- src/powersensor_local/zeroconf_devices.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/powersensor_local/zeroconf_devices.py b/src/powersensor_local/zeroconf_devices.py index 23a026e..60f5ed9 100755 --- a/src/powersensor_local/zeroconf_devices.py +++ b/src/powersensor_local/zeroconf_devices.py @@ -312,14 +312,20 @@ def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | Non return None try: - mac = info.properties[b'id'].decode('utf-8') - except (KeyError, AttributeError) as err: - _LOGGER.error( - "Missing 'id' property in zeroconf record for %s: %s", name, err - ) + raw_id = info.properties[b'id'] + except KeyError: + _LOGGER.error("Missing 'id' property in zeroconf record for %s", name) + return None + + if raw_id is None: + _LOGGER.error("'id' property in zeroconf record for %s has no value", name) + return None + + if info.port is None: + _LOGGER.error("No port in zeroconf record for %s", name) return None - return mac, addresses[0], info.port + return raw_id.decode('utf-8'), addresses[0], info.port def add_service(self, zc: Any, type_: str, name: str) -> None: result = self._extract(zc, type_, name)