Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions lelab/teleoperate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from lerobot.teleoperators.so_leader import SO101Leader, SO101LeaderConfig

from .utils.config import setup_calibration_files
from .utils.devices import safe_disconnect_device

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,12 +129,7 @@ def _safe_disconnect(device) -> None:
Used on the connection-failure cleanup path so one device's failure can't
leave the other holding its serial port open.
"""
if device is None:
return
try:
device.disconnect()
except Exception as e:
logger.warning(f"Error disconnecting device during cleanup: {e}")
safe_disconnect_device(device, logger)


def handle_start_teleoperation(request: TeleoperateRequest, websocket_manager=None) -> dict[str, Any]:
Expand Down
51 changes: 51 additions & 0 deletions lelab/utils/devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Device cleanup helpers for LeRobot hardware wrappers.

Serial ports are a trust boundary on Windows: if a normal disconnect fails
while disabling torque, the COM handle can stay open until the Python process
exits. These helpers preserve LeRobot's normal disconnect behavior, then force
close the underlying port/cameras as a last resort.
"""

from __future__ import annotations

import logging
from contextlib import suppress
from typing import Any


def safe_disconnect_device(device: Any, logger: logging.Logger, context: str = "cleanup") -> None:
"""Disconnect a LeRobot device and force-release resources on failure."""
if device is None:
return

try:
device.disconnect()
return
except Exception as exc:
logger.warning("Error disconnecting device during %s: %s", context, exc)

_force_close_device_resources(device, logger)


def _force_close_device_resources(device: Any, logger: logging.Logger) -> None:
"""Best-effort release for serial/camera resources after disconnect fails."""
bus = getattr(device, "bus", None)
port_handler = getattr(bus, "port_handler", None)
if port_handler is not None:
with suppress(Exception):
port_handler.clearPort()
with suppress(Exception):
port_handler.is_using = False
try:
port_handler.closePort()
logger.info("Force-closed serial port after disconnect failure")
except Exception as exc:
logger.warning("Failed to force-close serial port after disconnect failure: %s", exc)

cameras = getattr(device, "cameras", None)
if isinstance(cameras, dict):
for cam in cameras.values():
try:
cam.disconnect()
except Exception as exc:
logger.warning("Failed to disconnect camera after device cleanup failure: %s", exc)
67 changes: 67 additions & 0 deletions tests/test_devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import logging


def test_safe_disconnect_force_closes_serial_port_after_disconnect_failure() -> None:
from lelab.utils.devices import safe_disconnect_device

class PortHandler:
def __init__(self) -> None:
self.cleared = False
self.closed = False
self.is_using = True

def clearPort(self) -> None: # noqa: N802 - mirrors LeRobot port handler API
self.cleared = True

def closePort(self) -> None: # noqa: N802 - mirrors LeRobot port handler API
self.closed = True

class Camera:
def __init__(self) -> None:
self.disconnected = False

def disconnect(self) -> None:
self.disconnected = True

class Device:
def __init__(self) -> None:
self.bus = type("Bus", (), {"port_handler": PortHandler()})()
self.cameras = {"cam": Camera()}

def disconnect(self) -> None:
raise RuntimeError("Failed to write 'Torque_Enable' on id_=6")

device = Device()
safe_disconnect_device(device, logging.getLogger(__name__))

assert device.bus.port_handler.cleared is True
assert device.bus.port_handler.is_using is False
assert device.bus.port_handler.closed is True
assert device.cameras["cam"].disconnected is True


def test_safe_disconnect_uses_normal_disconnect_when_it_succeeds() -> None:
from lelab.utils.devices import safe_disconnect_device

class PortHandler:
def __init__(self) -> None:
self.closed = False

def closePort(self) -> None: # noqa: N802 - mirrors LeRobot port handler API
self.closed = True

class Device:
def __init__(self) -> None:
self.bus = type("Bus", (), {"port_handler": PortHandler()})()
self.disconnected = False

def disconnect(self) -> None:
self.disconnected = True

device = Device()
safe_disconnect_device(device, logging.getLogger(__name__))

assert device.disconnected is True
assert device.bus.port_handler.closed is False