Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions lelab/rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,84 @@ def _format_cameras_arg(cameras: dict[str, dict[str, Any]]) -> str:
return "{" + ", ".join(parts) + "}"


# Exception lines at the tail of a Python traceback look like
# "RuntimeError: ..." or "lerobot.errors.DeviceNotConnectedError: ...".
_EXC_LINE_RE = re.compile(r"^[A-Za-z_][\w.]*(?:Error|Exception|Interrupt|Timeout|Failure)\b")


def _extract_error_from_log(log_path: str | None) -> str | None:
"""Pull the meaningful error out of a failed rollout's log so the UI can
show it directly instead of telling the user to open a file in the cache."""
if not log_path:
return None
try:
with open(log_path, encoding="utf-8", errors="replace") as fh:
lines = fh.readlines()
except OSError:
return None
tail = [ln.rstrip("\n") for ln in lines[-50:]]
# Prefer the last exception line + everything after it (the message body).
exc_idx = next((i for i in range(len(tail) - 1, -1, -1) if _EXC_LINE_RE.match(tail[i])), None)
if exc_idx is not None:
snippet = "\n".join(tail[exc_idx:]).strip()
else:
non_empty = [ln for ln in tail if ln.strip()]
snippet = "\n".join(non_empty[-6:]).strip()
snippet = re.sub(r"\n\s*\n+", "\n", snippet)
if len(snippet) > 500:
snippet = snippet[:500].rstrip() + "…"
return snippet or None


def _friendly_hint(error_text: str | None) -> str | None:
"""A plain-language, actionable headline for the common SO-101 failures."""
if not error_text:
return None
low = error_text.lower()
if "overload" in low or "torque_enable" in low:
return (
"A motor overloaded — usually the gripper holding an object too hard. Release the object / "
"open the gripper and power-cycle the arm before trying again."
)
if "missing motor ids" in low or "motor check failed" in low:
return (
"A follower motor isn't responding (often the gripper, id 6). If a skill was holding an object "
"it likely overloaded — remove it, power-cycle the arm, then try teleoperation first."
)
if "could not connect" in low or "failed to connect" in low or "not connected" in low:
return "Couldn't connect to the arm — make sure it's plugged in, powered on, and on the right port."
if "frame is too old" in low or "no frame" in low or "frame timeout" in low:
return (
"A camera can't keep up — frames are arriving too slowly. Lower its resolution/FPS, "
"set FOURCC=MJPG, and close other heavy apps, then try again."
)
if "failed to set capture_" in low or "actual_width" in low or "actual_height" in low:
return "A camera doesn't support the configured resolution — open camera settings and click Auto."
if "permission" in low and ("port" in low or "com" in low):
return "Couldn't open the serial port — close anything else using it, or run `lelab --stop`."
return None


# Errors that mean the policy actually ran and only shutdown/cleanup tripped —
# e.g. disabling torque on a gripper still holding an object.
_CLEANUP_MARKERS = ("overload", "torque_enable", "disconnect", "not connected")


def _classify_outcome(rc: int | None, rollout_started: bool, error_text: str | None) -> str:
"""ok | ran_with_warning | failed.

A non-zero exit *after* the rollout main loop started, where the error is a
torque-disable/overload on shutdown, means the skill ran but a motor (usually
the loaded gripper) complained during cleanup — that's a warning, not a
failure, so the UI shouldn't call a working run "failed"."""
if not rc:
return "ok"
low = (error_text or "").lower()
if rollout_started and any(marker in low for marker in _CLEANUP_MARKERS):
return "ran_with_warning"
return "failed"


def handle_start_inference(request: InferenceRequest) -> dict[str, Any]:
"""Start a one-shot rollout subprocess. Returns a dict — the route
layer turns it into a JSON response or HTTPException as appropriate."""
Expand Down Expand Up @@ -308,10 +386,17 @@ def handle_inference_status() -> dict[str, Any]:
_inference_started_at = None
_inference_rollout_started_at = None
_inference_meta = {}
# On failure, surface the real error from the log so the UI doesn't
# have to send the user digging through the cache.
error = _extract_error_from_log(finished_meta.get("log_path")) if rc else None
outcome = _classify_outcome(rc, finished_rollout_started is not None, error)
return {
"inference_active": False,
"exited": True,
"exit_code": rc,
"outcome": outcome,
"error": error,
"hint": _friendly_hint(error),
"policy_ref": finished_meta.get("policy_ref"),
"duration_s": finished_meta.get("duration_s"),
"log_path": finished_meta.get("log_path"),
Expand Down
40 changes: 40 additions & 0 deletions tests/test_rollout.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,43 @@ def test_handle_start_inference_blocked_when_already_active(monkeypatch) -> None
assert result["success"] is False
assert result["status_code"] == 409
assert "already active" in result["message"]


def test_classify_outcome_ok_warns_and_fails() -> None:
from lelab.rollout import _classify_outcome

# rc 0/None => the run was fine.
assert _classify_outcome(0, True, "overload") == "ok"
assert _classify_outcome(None, True, None) == "ok"
# Non-zero AFTER the rollout started, with a torque-disable/overload on
# shutdown => the skill ran; only cleanup tripped.
assert _classify_outcome(1, True, "Motor 6 overload, torque_enable failed") == "ran_with_warning"
# Never started, or an unrelated error => a real failure.
assert _classify_outcome(1, False, "overload") == "failed"
assert _classify_outcome(1, True, "could not connect to the arm") == "failed"


def test_friendly_hint_maps_common_failures() -> None:
from lelab.rollout import _friendly_hint

assert "gripper" in (_friendly_hint("Motor overload detected") or "").lower()
assert "connect" in (_friendly_hint("Failed to connect to the follower") or "").lower()
assert _friendly_hint("some unrecognised traceback") is None
assert _friendly_hint(None) is None


def test_extract_error_from_log_pulls_exception_tail(tmp_path) -> None:
from lelab.rollout import _extract_error_from_log

log = tmp_path / "rollout.log"
log.write_text(
"INFO starting rollout\n"
"Traceback (most recent call last):\n"
' File "x.py", line 1\n'
"RuntimeError: gripper overload during shutdown\n",
encoding="utf-8",
)
out = _extract_error_from_log(str(log))
assert out is not None and "RuntimeError: gripper overload during shutdown" in out
assert _extract_error_from_log(None) is None
assert _extract_error_from_log(str(tmp_path / "missing.log")) is None