diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ab448d..60b83bf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,3 +12,8 @@ repos: - id: end-of-file-fixer - id: check-yaml - id: check-added-large-files +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.10.0 + hooks: + - id: mypy + args: [ --config-file=pyproject.toml ] diff --git a/README.md b/README.md index d6c4334..5095a0b 100644 --- a/README.md +++ b/README.md @@ -393,6 +393,24 @@ It writes a hashed packet with `policy-diff.json` and `promotion-decision.json`, then exits `0` for `PROMOTE`, `1` for `HOLD`, and `4` for `ROLLBACK`. +For the operator-facing release decision, use the composed assurance command. +It consumes an existing proof packet, optionally adds realtime/action-execution +and shadow evidence, and writes one `release-assurance.json` / Markdown report: + +```bash +tether release assure /tmp/tether-deploy-proof \ + --profile warehouse-safe \ + --control-hz 20 \ + --execution-cert \ + --shadow-trace ./traces/shadow.jsonl.gz \ + --output-dir /tmp/tether-release-assurance +``` + +The top-level decision is `PROMOTE`, `HOLD`, or `ROLLBACK`. The report includes +component decisions, risk signals such as policy action delta, latency +regression, stale action window, chunk-boundary delta, velocity discontinuity, +and open evidence gaps for production rollout. + For serving-specific deployment confidence, turn the same proof packet into a realtime certificate. This answers whether the measured `/act` path fits the robot control loop on the target hardware/cell: @@ -532,14 +550,14 @@ Filter dimensions: `--since` (`7d` / `24h` / `30m`), `--task` (case-insensitive | Ada Lovelace (RTX 40-series, L4) | sm_8.9 | ✅ Supported | | | Hopper (H100, H200) | sm_9.0 | ✅ Supported | | | Jetson Orin (Orin Nano / NX / AGX) | sm_8.7 | ✅ Supported | JetPack 6.x | -| Jetson Thor | sm_10.x | ⚠️ Untested | Should work — same Blackwell silicon as desktop, but ORT-bundled CUDA EP needs Blackwell support (see below) | -| **Blackwell desktop (RTX 5090, RTX PRO 6000, B200, GB200)** | **sm_10.0** | **❌ Not yet supported** | ORT's bundled cuBLAS/cuDNN don't ship sm_100 kernels. Server segfaults at `InferenceSession` init. **Workaround:** use `tether chat` (no GPU needed), or `/act` testing on Modal cloud or non-Blackwell GPU until ORT updates ship. Tracking: [microsoft/onnxruntime#blackwell](https://github.com/microsoft/onnxruntime/issues) | +| Jetson Thor | sm_10.x | ⚠️ Untested | Same Blackwell silicon as desktop; ORT ≥1.25.1 ships those kernels. Untested only for lack of hardware. | +| **Blackwell desktop (RTX 5090, RTX PRO 6000, B200, GB200)** | **sm_10.0 / 12.0** | **⚠️ Supported (smoke-validate)** | The pinned `onnxruntime-gpu>=1.25.1` ships Blackwell sm_120 kernels, so the earlier `InferenceSession`-init segfault is resolved. Smoke-validation recommended before declaring fully production-ready (open ORT threading issue #27621). On ORT < 1.25.1 the server still segfaults — `tether doctor` and the `tether go` Blackwell guard detect this and print the upgrade path. | | Older NVIDIA (Turing RTX 20, GTX 16) | sm_7.5 | ⚠️ Best-effort | Should work but not in CI matrix | | Pre-Tensor-Core (Maxwell Jetson Nano 4GB, GTX 9-series) | sm_5.x | ❌ Not supported | NVIDIA EOL'd this hardware at JetPack 4.6 (Python 3.6) — too old for modern ML stacks regardless. The bootstrap installer auto-detects and bails fast with redirect instructions. | -**For Blackwell users right now:** the bootstrap installer accepts your hardware and the package installs cleanly, but `tether go` will segfault at server startup. The real fix requires ORT to ship Blackwell-aware bundled binaries (no published timeline). Workarounds: chat-only mode (no GPU needed), `tether doctor`, `tether models list` all work fine. `/act` and TRT-engine inference need a non-Blackwell GPU temporarily. +**For Blackwell users:** the default install pins `onnxruntime-gpu>=1.25.1`, which ships Blackwell sm_120 kernels — so `tether go` serves on RTX 50-series / B200 / GB200 hardware. The earlier `InferenceSession`-init segfault only occurs on ORT < 1.25.1; `tether doctor` and the `tether go` Blackwell guard detect that and print the upgrade path. Smoke-validate your model on-device before production (open ORT threading issue #27621). -A Blackwell-specific runtime path via TensorRT-LLM (which supports sm_100) is tracked upstream. +A native TensorRT-LLM path (sm_100 / sm_120) is tracked upstream as an additional Blackwell runtime. ## Composable runtime wedges @@ -590,6 +608,10 @@ Full ledger: [reflex_context/measured_numbers.md](reflex_context/measured_number **Latency numbers are intentionally not in the README yet** — earlier TRT FP16 tables were measured on a now-abandoned decomposed-ONNX path. `tether bench ` reproduces on any hardware. + + + + Reproduce on your own GPU with one command: ```bash diff --git a/docs/cli_reference.md b/docs/cli_reference.md index c1ffd58..9d700fb 100644 --- a/docs/cli_reference.md +++ b/docs/cli_reference.md @@ -176,6 +176,35 @@ the selected `--fail-on` gate tripped. --- +## `tether release` + +Customer-facing release assurance workflow. It consumes an existing proof packet +and returns the decision an operator needs before a robot policy update reaches a +fleet: `PROMOTE`, `HOLD`, or `ROLLBACK`. + +```bash +tether release assure ./tether-deploy-proof \ + --profile warehouse-safe \ + --control-hz 20 \ + --execution-cert \ + --shadow-trace ./traces/shadow.jsonl.gz \ + --output-dir ./release-assurance \ + --json +``` + +`release assure` composes lower-level evidence instead of replacing it: +deployment proof, promotion gates, optional realtime serving certificate, +optional action-execution certificate, and optional shadow rollout gate. The +JSON/Markdown report includes component decisions, blocking checks, risk signals +such as stale action windows and chunk-boundary jumps, and open evidence gaps. + +Artifacts written with `--output-dir`: `release-assurance.json`, +`release-assurance.md`, and `MANIFEST.json`. Exit codes: `0` means `PROMOTE`, +`1` means `HOLD`, `4` means `ROLLBACK`, and `2` means the packet or arguments +could not be loaded. + +--- + ## `tether rollout` Self-serve rollout decision workflow for candidate policies that were mirrored diff --git a/pyproject.toml b/pyproject.toml index d58bccd..2d8c24c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -324,6 +324,27 @@ artifacts = ["*.cpp", "*.cu", "*.txt", "*.json"] target-version = "py310" line-length = 100 +[tool.mypy] +python_version = "3.10" +mypy_path = "src" +ignore_missing_imports = true +follow_imports = "silent" +check_untyped_defs = false +warn_unused_configs = true +exclude = [ + "src/tether/models/third_party/", +] + +[[tool.mypy.overrides]] +module = "tether.exporters.monolithic" +disable_error_code = [ + "assignment", + "attr-defined", + "method-assign", + "misc", + "no-redef", +] + [tool.pytest.ini_options] testpaths = ["tests"] # pytest-asyncio v1.x requires explicit mode declaration. Tests use diff --git a/scripts/publish_jetson_latency.py b/scripts/publish_jetson_latency.py new file mode 100644 index 0000000..72d40f2 --- /dev/null +++ b/scripts/publish_jetson_latency.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +"""Publish realtime-serving certificates into a latency table (standalone CLI). + +Thin wrapper over :func:`tether.realtime_cert_publish.publish` so the publish +flow is runnable standalone (e.g. in CI) without importing the full tether CLI. +Equivalent to the ``tether publish-latency`` subcommand. + +Usage: + python scripts/publish_jetson_latency.py /tmp/orin-smolvla-cert [more-certs...] + python scripts/publish_jetson_latency.py certs/*.json --no-readme + python scripts/publish_jetson_latency.py CERT_DIR --out path/to/results.md + +Each positional arg may be a cert JSON file or a directory containing +``realtime-serving-cert.json``. Pure stdlib + ``tether.realtime_cert``; no GPU +needed — runs anywhere the package is importable. +""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +from tether.realtime_cert_publish import ( + DEFAULT_RESULTS_DOC, + README_TABLE_BEGIN, + README_TABLE_END, + CertificateLoadError, + publish, +) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "certs", + nargs="+", + help="cert JSON files, or dirs containing realtime-serving-cert.json", + ) + parser.add_argument( + "--out", + type=Path, + default=DEFAULT_RESULTS_DOC, + help=f"results doc path (default: {DEFAULT_RESULTS_DOC})", + ) + parser.add_argument( + "--readme", + type=Path, + default=Path("README.md"), + help="README to inject the table into (default: README.md)", + ) + parser.add_argument( + "--no-readme", action="store_true", help="don't touch the README" + ) + parser.add_argument( + "--title", default="Realtime serving latency", help="table heading" + ) + args = parser.parse_args(argv) + + try: + result = publish( + args.certs, + out=args.out, + readme=None if args.no_readme else args.readme, + title=args.title, + ) + except CertificateLoadError as exc: + print(f"error: {exc}", file=sys.stderr) + return 2 + + print(result["table"]) + print(f"wrote {result['out']} ({result['count']} certificate(s))") + if not args.no_readme: + if result["readme_updated"]: + print(f"injected table into {args.readme}") + else: + print( + f"note: markers not found in {args.readme}; skipped injection " + f"(add {README_TABLE_BEGIN} / {README_TABLE_END})", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/tether/chat/executor.py b/src/tether/chat/executor.py index 97fc3c8..0e99d02 100644 --- a/src/tether/chat/executor.py +++ b/src/tether/chat/executor.py @@ -116,6 +116,23 @@ def _build_promote(p: dict[str, Any]) -> list[str]: return args +def _build_release_assurance(p: dict[str, Any]) -> list[str]: + args = ["release", "assure", str(p["packet"])] + _flag(args, "profile", p.get("profile")) + if p.get("candidate_active") is True: + args.append("--candidate-active") + _flag(args, "control-hz", p.get("control_hz")) + _flag(args, "target", p.get("target")) + if p.get("execution_cert") is True: + args.append("--execution-cert") + _flag(args, "shadow-trace", p.get("shadow_trace")) + _flag(args, "min-compared", p.get("min_compared")) + _flag(args, "output-dir", p.get("output_dir")) + if p.get("json") is True: + args.append("--json") + return args + + def _build_realtime_cert(p: dict[str, Any]) -> list[str]: args = ["bench", "realtime", str(p["proof"])] _flag(args, "target", p.get("target")) @@ -255,6 +272,7 @@ def _build_replay(p: dict[str, Any]) -> list[str]: "prove_deployment": _build_prove, "diff_policies": _build_policy_diff, "decide_promotion": _build_promote, + "assure_release": _build_release_assurance, "certify_realtime_serving": _build_realtime_cert, "show_promotion_profile": _build_show_profile, "benchmark": _build_bench, diff --git a/src/tether/chat/loop.py b/src/tether/chat/loop.py index cc98ea9..b4b65f2 100644 --- a/src/tether/chat/loop.py +++ b/src/tether/chat/loop.py @@ -16,7 +16,7 @@ # needs explicit instruction to copy specific values rather than summarize them. SYSTEM_PROMPT = """You are the Tether assistant. Tether is a deployment-confidence CLI for vision-language-action (VLA) robot policies. The main product question is whether a policy has enough evidence to promote, block, or roll back. -You have tools that wrap the `tether` CLI. Use them to act on the user's behalf instead of describing commands. Pick the smallest tool that answers the question. Don't ask for confirmation before read-only tools (list_models, doctor, list_traces, list_promotion_profiles, show_promotion_profile, decide_promotion, certify_realtime_serving). Use list_promotion_profiles or show_promotion_profile when the user asks which promotion profile to use or what a profile checks. Use prove_deployment when the user asks whether an export is safe, ready, deployable, production-ready, suitable for a robot, or needs a proof packet; include policy_diff_* parameters when the user provides candidate/shadow traces for rollout evidence. It is an offline/local proof path and does not actuate hardware. Use prove_realtime_deployment when the user gives an export path and asks whether it can meet a realtime, 20 Hz, 50 Hz, p95, jitter, deadline, or control-loop budget; include control_hz when the user names a control rate, and include execution_cert when the user asks about stale chunks, adaptive action chunking, chunk-boundary smoothness, execution horizon, or action continuity. Use certify_realtime_serving only when the user gives an existing proof packet and asks whether that proof can meet a realtime/control-loop budget. Use decide_promotion when the user asks whether an existing proof packet should promote, block, or roll back. Use diff_policies when the user asks for only a standalone candidate/shadow policy diff or whether a policy is safe to promote. For destructive, hardware-actuating, or long-running tools (export_model, serve_model against a real robot transport, distill, finetune, evaluate), confirm intent first if the user's request is ambiguous about scope. +You have tools that wrap the `tether` CLI. Use them to act on the user's behalf instead of describing commands. Pick the smallest tool that answers the question. Don't ask for confirmation before read-only tools (list_models, doctor, list_traces, list_promotion_profiles, show_promotion_profile, decide_promotion, certify_realtime_serving, assure_release). Use list_promotion_profiles or show_promotion_profile when the user asks which promotion profile to use or what a profile checks. Use prove_deployment when the user asks whether an export is safe, ready, deployable, production-ready, suitable for a robot, or needs a proof packet; include policy_diff_* parameters when the user provides candidate/shadow traces for rollout evidence. It is an offline/local proof path and does not actuate hardware. Use prove_realtime_deployment when the user gives an export path and asks whether it can meet a realtime, 20 Hz, 50 Hz, p95, jitter, deadline, or control-loop budget; include control_hz when the user names a control rate, and include execution_cert when the user asks about stale chunks, adaptive action chunking, chunk-boundary smoothness, execution horizon, or action continuity. Use assure_release when the user gives an existing proof packet and asks whether a robot policy update/release should promote, hold, or roll back, especially if they also mention realtime, shadow rollout, action chunk continuity, or fleet release readiness. Use certify_realtime_serving only when the user asks specifically for a realtime/control-loop certificate from an existing proof packet. Use decide_promotion when the user asks only for the lower-level proof-packet promotion gate. Use diff_policies when the user asks for only a standalone candidate/shadow policy diff or whether a policy is safe to promote. For destructive, hardware-actuating, or long-running tools (export_model, serve_model against a real robot transport, distill, finetune, evaluate), confirm intent first if the user's request is ambiguous about scope. When a tool returns a non-zero exit code, read its stderr, explain what went wrong in one sentence, and suggest a concrete next action. Don't fabricate tool output. diff --git a/src/tether/chat/schema.py b/src/tether/chat/schema.py index 39e1f1c..43f8974 100644 --- a/src/tether/chat/schema.py +++ b/src/tether/chat/schema.py @@ -126,6 +126,25 @@ def _tool(name: str, description: str, parameters: dict[str, Any]) -> dict[str, "required": ["packet"], }, ), + _tool( + "assure_release", + "Build one release assurance report from an existing proof packet. Use this when the user asks whether a robot policy update/release should promote, hold, or roll back and may also care about realtime serving, action chunk continuity, or shadow rollout evidence.", + { + "properties": { + "packet": {"type": "string", "description": "Deployment proof packet directory, or deployment-proof.json path."}, + "profile": {"type": "string", "description": "Optional built-in promotion profile name or JSON/YAML path."}, + "candidate_active": {"type": "boolean", "description": "Return ROLLBACK instead of HOLD when gates fail for an active rollout."}, + "control_hz": {"type": "number", "description": "Robot control rate. Setting this includes realtime evidence."}, + "target": {"type": "string", "description": "Hardware/cell label, e.g. agx-orin-cell-a."}, + "execution_cert": {"type": "boolean", "description": "Also certify stale action windows, chunk-boundary continuity, velocity discontinuity, and runtime attribution."}, + "shadow_trace": {"type": "string", "description": "Optional shadow trace from `tether serve --shadow-policy --record`."}, + "min_compared": {"type": "integer", "description": "Minimum compared shadow requests required before promotion. Default 1."}, + "output_dir": {"type": "string", "description": "Directory for release-assurance artifacts."}, + "json": {"type": "boolean", "description": "Emit JSON instead of human output."}, + }, + "required": ["packet"], + }, + ), _tool( "prove_realtime_deployment", "Run a deterministic realtime deployment proof chain for an export: `tether prove` into a known proof directory, then `tether bench realtime` against that same packet. Use this when the user gives an export path and asks whether it can run at 20 Hz, 50 Hz, realtime, or inside a robot control-loop budget.", diff --git a/src/tether/chat/welcome.py b/src/tether/chat/welcome.py index b1b3aff..77b828f 100644 --- a/src/tether/chat/welcome.py +++ b/src/tether/chat/welcome.py @@ -109,6 +109,7 @@ def tools_listing() -> str: "certify_realtime_serving": "Deploy", "diff_policies": "Deploy", "decide_promotion": "Deploy", + "assure_release": "Deploy", "list_promotion_profiles": "Deploy", "show_promotion_profile": "Deploy", "list_models": "Models", diff --git a/src/tether/cli.py b/src/tether/cli.py index e38979c..bd84cc9 100644 --- a/src/tether/cli.py +++ b/src/tether/cli.py @@ -38,6 +38,7 @@ [green]tether chat[/green] ask what to prove, deploy, or fix [green]tether chat --tui[/green] ↳ full-screen TUI (needs [dim]pip install 'tether\\[tui]'[/dim]) [green]tether prove ./export[/green] collect a deployment proof packet + [green]tether release assure ./proof[/green] decide PROMOTE / HOLD / ROLLBACK [green]tether promote ./proof[/green] decide PROMOTE / BLOCK / ROLLBACK [green]tether rollout gate ./trace[/green] decide PROMOTE / HOLD / ROLLBACK from shadow evidence [green]tether profiles list[/green] choose a built-in promotion profile @@ -86,6 +87,7 @@ def _skip_blocking_onboarding(ctx: typer.Context) -> bool: "smoke", "deploy-proof", "prove", + "release", "promote", "profiles", "policy", @@ -962,6 +964,60 @@ def _bench_realtime_cmd( raise typer.Exit(1) +@app.command(name="publish-latency", hidden=True) +def publish_latency_cmd( + certs: list[str] = typer.Argument( + ..., + help="Realtime cert JSON files, or dirs containing realtime-serving-cert.json", + ), + out: Path = typer.Option( + Path("reflex_context/06_experiments/jetson_latency_results.md"), + "--out", + help="Results doc path.", + ), + readme: Path = typer.Option( + Path("README.md"), "--readme", help="README to inject the table into." + ), + no_readme: bool = typer.Option( + False, "--no-readme", help="Don't touch the README." + ), + title: str = typer.Option( + "Realtime serving latency", "--title", help="Table heading." + ), +) -> None: + """Publish `bench realtime` certificates into a comparison latency table. + + Renders one row per certificate (model x target), writes the results doc, and + injects the table into the README between the latency-table markers. The + standalone `scripts/publish_jetson_latency.py` is the same flow without the CLI. + """ + + from tether.realtime_cert_publish import CertificateLoadError, publish + + try: + result = publish( + list(certs), + out=out, + readme=None if no_readme else readme, + title=title, + ) + except CertificateLoadError as exc: + err_console.print(f"[red]{exc}[/red]") + raise typer.Exit(2) + + typer.echo(result["table"]) + console.print( + f" [dim]Wrote[/dim] {result['out']} ({result['count']} certificate(s))" + ) + if not no_readme: + if result["readme_updated"]: + console.print(f" [dim]Injected table into[/dim] {readme}") + else: + err_console.print( + f"[yellow]Markers not found in {readme}; skipped injection.[/yellow]" + ) + + @app.command(name="bench", hidden=True) def benchmark_cmd( export_dir: str = typer.Argument( @@ -1195,6 +1251,7 @@ def benchmark_cmd( except Exception: export_config = {} + server: Any if export_config.get("export_kind") == "monolithic": model_type = export_config.get("model_type", "smolvla") if model_type == "pi0": @@ -4103,7 +4160,7 @@ def doctor( if output_format == "json": import json as _json - payload = { + payload: dict[str, Any] = { "path": str(cache_path), "current_fingerprint": current_fp.to_dict(), "is_stale": is_stale, @@ -4626,7 +4683,7 @@ def _summary(checks: list[dict[str, Any]]) -> dict[str, int]: "skip": sum(1 for check in checks if check["status"] == "skip"), } - payload: dict[str, Any] = { + payload = { "schema_version": 1, "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z"), "system_probe": { @@ -4935,7 +4992,6 @@ def models_pull( repo_id=entry.hf_repo, revision=rev or None, local_dir=str(target), - local_dir_use_symlinks=False, ) except Exception as e: err_console.print(f"[red]Download failed: {type(e).__name__}: {e}[/red]") @@ -5163,7 +5219,6 @@ def go( repo_id=entry.hf_repo, revision=entry.hf_revision or None, local_dir=str(target), - local_dir_use_symlinks=False, ) except Exception as e: err_console.print(f"[red]Download failed: {type(e).__name__}: {e}[/red]") @@ -5383,6 +5438,10 @@ def go( help="Self-serve rollout decisions from shadow evidence.", no_args_is_help=True, ) +release_app = typer.Typer( + help="Release assurance packets for robot policy updates.", + no_args_is_help=True, +) profiles_app = typer.Typer( help="Built-in promotion profiles for proof-to-promote decisions.", no_args_is_help=True, @@ -6125,6 +6184,253 @@ def rollout_gate_cmd( ) +@release_app.command("assure") +def release_assure_cmd( + packet: str = typer.Argument( + ..., + help="Deployment proof packet directory, or deployment-proof.json path.", + ), + profile: str = typer.Option( + "", + "--profile", + help="Built-in promotion profile name, or JSON/YAML path.", + ), + candidate_active: bool = typer.Option( + False, + "--candidate-active", + help="Return ROLLBACK instead of HOLD when gates fail for an active candidate.", + ), + realtime: bool = typer.Option( + False, + "--realtime", + help="Attach a realtime serving certificate to the release assurance report.", + ), + target: str = typer.Option( + "", + "--target", + help="Hardware/cell label written into the realtime evidence.", + ), + control_hz: float = typer.Option( + 0.0, + "--control-hz", + help="Robot control rate. Setting this implies --realtime.", + ), + max_roundtrip_p95_ms: float = typer.Option( + 0.0, + "--max-roundtrip-p95-ms", + help="Optional p95 roundtrip budget. 0 uses realtime certificate default.", + ), + max_jitter_p95_minus_p50_ms: float = typer.Option( + 0.0, + "--max-jitter-p95-minus-p50-ms", + help="Optional jitter budget. 0 uses profile/default behavior.", + ), + max_deadline_misses: int = typer.Option( + 0, + "--max-deadline-misses", + help="Allowed deadline misses in realtime evidence.", + ), + max_control_budget_misses: int = typer.Option( + 0, + "--max-control-budget-misses", + help="Allowed samples slower than the control period.", + ), + max_act_errors: int = typer.Option( + 0, + "--max-act-errors", + help="Allowed /act errors.", + ), + execution_cert: bool = typer.Option( + False, + "--execution-cert", + help="Attach action-execution continuity checks to realtime evidence.", + ), + max_stale_action_window_ms: float = typer.Option( + 0.0, + "--max-stale-action-window-ms", + help="Execution cert stale-action budget. 0 uses 100 ms.", + ), + max_chunk_boundary_delta: float = typer.Option( + 0.0, + "--max-chunk-boundary-delta", + help="Execution cert max chunk-boundary action delta. 0 uses 0.15.", + ), + max_velocity_discontinuity: float = typer.Option( + 0.0, + "--max-velocity-discontinuity", + help="Execution cert max boundary velocity jump. 0 uses 0.2.", + ), + require_phase_aware_horizon: bool = typer.Option( + False, + "--require-phase-aware-horizon", + help="Require phase/low-speed transition evidence in the action execution cert.", + ), + require_runtime_attribution: bool = typer.Option( + True, + "--require-runtime-attribution/--no-require-runtime-attribution", + help="Require scheduler/cache/adaptive-horizon attribution.", + ), + shadow_trace: str = typer.Option( + "", + "--shadow-trace", + help="Optional shadow trace recorded by `tether serve --shadow-policy --record`.", + ), + min_compared: int = typer.Option( + 1, + "--min-compared", + help="Minimum compared shadow requests required before promotion.", + ), + wait_timeout_s: float = typer.Option( + 0.0, + "--wait-timeout-s", + help="Seconds to wait for pending background shadow_result rows.", + ), + poll_s: float = typer.Option( + 0.25, + "--poll-s", + help="Polling interval while waiting for shadow_result rows.", + ), + fail_on: str = typer.Option( + "any", + "--fail-on", + help="Shadow policy diff gate: none/actions/latency/guard/shape/any.", + ), + min_action_cos: float = typer.Option( + 0.995, + "--min-action-cos", + help="Minimum action cosine before the shadow diff fails.", + ), + max_action_delta: float = typer.Option( + 0.10, + "--max-action-delta", + help="Max absolute action delta before the shadow diff fails.", + ), + max_latency_regression_pct: float = typer.Option( + 0.10, + "--max-latency-regression-pct", + help="Max shadow latency regression as a fraction, e.g. 0.10 = 10%.", + ), + output_dir: str = typer.Option( + "", + "--output-dir", + help="Directory for release-assurance.json, Markdown, and MANIFEST.", + ), + output_format: str = typer.Option( + "human", + "--format", + help="Output format: 'human', 'json', or 'markdown'.", + ), + json_output: bool = typer.Option( + False, + "--json", + help="Alias for --format json.", + ), +) -> None: + """Build one promote/hold/rollback release assurance packet.""" + if json_output: + output_format = "json" + if output_format not in ("human", "json", "markdown"): + err_console.print( + f"[red]--format must be 'human', 'json', or 'markdown', got {output_format!r}[/red]" + ) + raise typer.Exit(2) + if control_hz < 0: + err_console.print("[red]--control-hz must be >= 0[/red]") + raise typer.Exit(2) + if min_compared < 1: + err_console.print("[red]--min-compared must be >= 1[/red]") + raise typer.Exit(2) + if poll_s <= 0: + err_console.print("[red]--poll-s must be > 0[/red]") + raise typer.Exit(2) + valid_fail_on = {"none", "actions", "latency", "guard", "shape", "any"} + if fail_on not in valid_fail_on: + err_console.print("[red]--fail-on must be one of none/actions/latency/guard/shape/any[/red]") + raise typer.Exit(2) + if ( + max_roundtrip_p95_ms < 0 + or max_jitter_p95_minus_p50_ms < 0 + or max_stale_action_window_ms < 0 + or max_chunk_boundary_delta < 0 + or max_velocity_discontinuity < 0 + or max_deadline_misses < 0 + or max_control_budget_misses < 0 + or max_act_errors < 0 + ): + err_console.print("[red]realtime thresholds and miss/error budgets must be >= 0[/red]") + raise typer.Exit(2) + + from tether.release_assurance import ( + ReleaseAssuranceError, + build_release_assurance, + format_release_assurance_human, + format_release_assurance_markdown, + write_release_assurance_packet, + ) + + try: + report = build_release_assurance( + packet=packet, + profile_path=profile or None, + candidate_active=candidate_active, + realtime=realtime, + target=target, + control_hz=control_hz if control_hz > 0 else None, + max_roundtrip_p95_ms=( + max_roundtrip_p95_ms if max_roundtrip_p95_ms > 0 else None + ), + max_jitter_p95_minus_p50_ms=( + max_jitter_p95_minus_p50_ms + if max_jitter_p95_minus_p50_ms > 0 + else None + ), + max_deadline_misses=max_deadline_misses, + max_control_budget_misses=max_control_budget_misses, + max_act_errors=max_act_errors, + execution_cert=execution_cert, + max_stale_action_window_ms=( + max_stale_action_window_ms if max_stale_action_window_ms > 0 else 100.0 + ), + max_chunk_boundary_delta=( + max_chunk_boundary_delta if max_chunk_boundary_delta > 0 else 0.15 + ), + max_velocity_discontinuity=( + max_velocity_discontinuity if max_velocity_discontinuity > 0 else 0.2 + ), + require_phase_aware_horizon=require_phase_aware_horizon, + require_runtime_attribution=require_runtime_attribution, + shadow_trace=shadow_trace or None, + shadow_min_compared=min_compared, + shadow_wait_timeout_s=wait_timeout_s, + shadow_poll_s=poll_s, + shadow_fail_on=fail_on, + shadow_min_action_cos=min_action_cos, + shadow_max_action_delta=max_action_delta, + shadow_max_latency_regression_pct=max_latency_regression_pct, + ) + if output_dir: + write_release_assurance_packet(report, output_dir) + except ReleaseAssuranceError as exc: + err_console.print(f"[red]Release assurance failed:[/red] {exc}") + raise typer.Exit(2) + + if output_format == "json": + typer.echo(json.dumps(report, indent=2, sort_keys=True)) + elif output_format == "markdown": + typer.echo(format_release_assurance_markdown(report)) + else: + console.print(format_release_assurance_human(report), markup=False) + if output_dir: + console.print(f"\n [dim]Release assurance packet:[/dim] {output_dir}") + + decision = report.get("decision") + if decision == "PROMOTE": + raise typer.Exit(0) + if decision == "ROLLBACK": + raise typer.Exit(4) + raise typer.Exit(1) + + app.add_typer(models_app, name="models") app.add_typer(train_app, name="train") app.add_typer(validate_app, name="validate") @@ -6133,6 +6439,7 @@ def rollout_gate_cmd( app.add_typer(profiles_app, name="profiles") app.add_typer(policy_app, name="policy") app.add_typer(rollout_app, name="rollout") +app.add_typer(release_app, name="release") # ─── tether connect {name} / disconnect / list ────────────────────────────── @@ -6415,7 +6722,7 @@ def _agent_run_once(agent_daemon: Any, cfg: Any, client: Any = None) -> Any: for name in ("run_once", "run_agent_once", "run_cycle"): fn = getattr(agent_daemon, name, None) if callable(fn): - attempts = [] + attempts: list[Any] = [] if client is not None: attempts.extend( ( @@ -6440,7 +6747,7 @@ def _agent_run_loop(agent_daemon: Any, cfg: Any, client: Any = None) -> None: for name in ("run_forever", "run_loop", "run_daemon", "start_daemon"): fn = getattr(agent_daemon, name, None) if callable(fn): - attempts = [] + attempts: list[Any] = [] if client is not None: attempts.extend( ( @@ -7095,7 +7402,7 @@ def pro_status( expires_at = data.get("expires_at", "") last_hb = data.get("last_heartbeat_at", "") - days_remaining = "?" + days_remaining: int | str = "?" if expires_at: try: exp = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) @@ -7326,7 +7633,7 @@ def calibrate_so_arm100_import( out_path = Path(output).expanduser() out_path.parent.mkdir(parents=True, exist_ok=True) - adapter.config.write_lerobot_calibration(out_path) + adapter.config.write_lerobot_calibration(str(out_path)) console.print(f"[green]Calibration imported[/green] → {out_path}") console.print( " Joints: " @@ -7354,7 +7661,7 @@ def calibrate_so_arm100_default( adapter = SOARM100Adapter.default() out_path = Path(output).expanduser() out_path.parent.mkdir(parents=True, exist_ok=True) - adapter.config.write_lerobot_calibration(out_path) + adapter.config.write_lerobot_calibration(str(out_path)) console.print(f"[green]Default calibration written[/green] → {out_path}") console.print( " [yellow]Factory defaults assume your servos are mid-pose with no " @@ -7502,6 +7809,7 @@ def curate_convert( import json as _json from tether.curate.format_converters import ( CONVERTER_REGISTRY, + FormatConverter, HDF5Converter, LeRobotV3Converter, OpenXEmbodimentConverter, @@ -7516,6 +7824,7 @@ def curate_convert( raise typer.Exit(2) try: + converter: FormatConverter if format == "lerobot-v3": converter = LeRobotV3Converter(robot_type=robot_type, fps=fps) elif format == "hdf5": diff --git a/src/tether/exporters/monolithic.py b/src/tether/exporters/monolithic.py index f100670..052baf5 100644 --- a/src/tether/exporters/monolithic.py +++ b/src/tether/exporters/monolithic.py @@ -26,6 +26,7 @@ import json import logging import time +from importlib.metadata import PackageNotFoundError, version as _dist_version from pathlib import Path from typing import Any @@ -64,6 +65,8 @@ "smolvla": "HuggingFaceTB/SmolLM2-135M", } +_SMOLVLA_EXPORT_PATCH_FAILURES: list[str] = [] + def _quiet_noisy_export_loggers() -> None: """Keep streamed export logs readable while preserving warnings/errors.""" @@ -120,7 +123,7 @@ def _require_monolithic_deps() -> None: """Check that the ``[monolithic]`` optional dep group is installed. Raises ImportError with a clean message if anything's missing or a - transformers version mismatch is detected (5.4+ has the q_length bug). + required package version mismatch is detected. """ _quiet_noisy_export_loggers() missing = [] @@ -136,8 +139,25 @@ def _require_monolithic_deps() -> None: except ImportError as e: missing.append(f"transformers==5.3.0 ({e})") + try: + __import__("lerobot") + try: + lerobot_version = _dist_version("lerobot") + except PackageNotFoundError as exc: + raise ImportError( + "lerobot imports but package metadata is missing; install with: " + "pip install 'fastcrest-tether[monolithic]'" + ) from exc + if lerobot_version != "0.5.1": + raise ImportError( + f"lerobot {lerobot_version} detected; monolithic export requires " + "exactly lerobot==0.5.1. Install with: " + "pip install 'fastcrest-tether[monolithic]'" + ) + except ImportError as e: + missing.append(f"lerobot==0.5.1 ({e})") + for mod_name, pip_name in [ - ("lerobot", "lerobot==0.5.1"), ("onnx_diagnostic", "onnx-diagnostic>=0.9"), ("onnxscript", "onnxscript>=0.1"), ("optree", "optree"), @@ -156,6 +176,29 @@ def _require_monolithic_deps() -> None: ) +def _record_smolvla_export_patch_failure(patch_name: str, exc: BaseException) -> None: + message = f"{patch_name}: {type(exc).__name__}: {exc}" + _SMOLVLA_EXPORT_PATCH_FAILURES.append(message) + logger.warning( + "[smolvla] export patch failed; SmolVLA monolithic export will stop " + "before torch.export instead of failing later: %s", + message, + ) + + +def _raise_if_smolvla_export_patches_failed() -> None: + if not _SMOLVLA_EXPORT_PATCH_FAILURES: + return + details = "\n - ".join(_SMOLVLA_EXPORT_PATCH_FAILURES) + raise RuntimeError( + "SmolVLA monolithic export patches failed to install. This usually " + "means the lerobot/transformers export stack does not match the pinned " + "monolithic environment; install with: pip install " + "'fastcrest-tether[monolithic]'.\n - " + + details + ) + + def apply_export_patches() -> None: """Install the full set of transformers + lerobot patches required for the monolithic `torch.export` path. Safe to call multiple times; later @@ -168,6 +211,8 @@ def apply_export_patches() -> None: import torch + _SMOLVLA_EXPORT_PATCH_FAILURES.clear() + # Stub GR00T imports to avoid Python 3.13 dataclass issue (harmless on 3.12 too) for _mod in ("lerobot.policies.groot.groot_n1", "lerobot.policies.groot.modeling_groot"): if _mod not in sys.modules: @@ -255,7 +300,10 @@ def _embed_image_with_explicit_patch_mask(self, image): _embed_image_with_explicit_patch_mask._tether_patched = True _smolvla.SmolVLMWithExpertModel.embed_image = _embed_image_with_explicit_patch_mask except Exception as exc: - logger.debug("SmolVLA explicit patch-mask export patch not installed: %s", exc) + _record_smolvla_export_patch_failure( + "SmolVLA explicit patch-mask export patch", + exc, + ) try: from transformers.models.smolvlm import modeling_smolvlm as _smolvlm @@ -294,7 +342,10 @@ def _vision_attention_forward_no_dense_mask(self, hidden_states, attention_mask= _vision_attention_forward_no_dense_mask._tether_patched = True _smolvlm.SmolVLMVisionAttention.forward = _vision_attention_forward_no_dense_mask except Exception as exc: - logger.debug("SmolVLM vision attention export patch not installed: %s", exc) + _record_smolvla_export_patch_failure( + "SmolVLM vision attention export patch", + exc, + ) # DynamicCache deepcopy bypass (FakeTensor can't be deepcopied) _orig_deepcopy = copy.deepcopy @@ -506,7 +557,6 @@ def _fix_onnx_where_dtype_mismatches(onnx_path: Path) -> int: for init in shape_info.graph.initializer: name_dtype[init.name] = init.data_type - FLOAT_TYPES = {TensorProto.FLOAT, TensorProto.FLOAT16, TensorProto.BFLOAT16, TensorProto.DOUBLE} INT_TYPES = {TensorProto.INT64, TensorProto.INT32, TensorProto.INT16, TensorProto.INT8} fixes = 0 @@ -589,6 +639,7 @@ def export_smolvla_monolithic( from onnx_diagnostic.torch_export_patches import torch_export_patches apply_export_patches() + _raise_if_smolvla_export_patches_failed() logger.info("[smolvla] Loading %s", model_id) from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy diff --git a/src/tether/realtime_cert.py b/src/tether/realtime_cert.py index 5fe2b81..76aee6b 100644 --- a/src/tether/realtime_cert.py +++ b/src/tether/realtime_cert.py @@ -119,6 +119,7 @@ def _control_budget_summary( period_ms = 1000.0 / control_hz samples = _roundtrip_samples(receipt) + missed_samples: int | None if period_ms and samples: missed_samples = sum(1 for value in samples if value > period_ms) missed_source = "act_samples" @@ -556,6 +557,68 @@ def format_realtime_certificate_markdown(report: dict[str, Any]) -> str: return "\n".join(lines) + "\n" +def _latency_cell(value: Any) -> str: + """Format a millisecond metric for a table cell; missing -> em dash.""" + + if value is None: + return "—" + try: + return f"{float(value):.1f}" + except (TypeError, ValueError): + return str(value) + + +def _certificate_model_label(report: dict[str, Any]) -> str: + """Best-effort model name for a certificate row. + + The certificate has no explicit model field, so derive it from the source + export directory (e.g. ``~/.cache/tether/exports/smolvla-base`` -> + ``smolvla-base``). Falls back to the target, then ``unknown``. + """ + + export_dir = ((report.get("source") or {}).get("export_dir") or "").rstrip("/") + name = Path(export_dir).name if export_dir else "" + return name or (report.get("target") or "unknown") + + +def format_realtime_certificates_markdown_table( + reports: list[dict[str, Any]], + *, + title: str = "Realtime serving latency", +) -> str: + """Render multiple realtime certificates as one comparison table. + + One row per certificate (model x target) with roundtrip p50/p95/p99/max and + the PASS/FAIL decision. Companion to :func:`format_realtime_certificate_markdown` + (which renders a single certificate vertically); this is the cross-run table + used to publish the README latency section and the Jetson latency results doc. + """ + + lines = [ + f"## {title}", + "", + "| Model | Target | Control | p50 ms | p95 ms | p99 ms | max ms | Decision |", + "|---|---|---:|---:|---:|---:|---:|:--:|", + ] + for report in reports: + control = report.get("control_budget") or {} + roundtrip = (report.get("latency") or {}).get("roundtrip_ms") or {} + hz = control.get("control_hz") + control_cell = f"{float(hz):g} Hz" if hz else "—" + decision = report.get("decision", "FAIL") + lines.append( + f"| `{_certificate_model_label(report)}` " + f"| `{report.get('target') or 'unspecified'}` " + f"| {control_cell} " + f"| {_latency_cell(roundtrip.get('p50_ms'))} " + f"| {_latency_cell(roundtrip.get('p95_ms'))} " + f"| {_latency_cell(roundtrip.get('p99_ms'))} " + f"| {_latency_cell(roundtrip.get('max_ms'))} " + f"| **{decision}** |" + ) + return "\n".join(lines) + "\n" + + def write_realtime_certificate( report: dict[str, Any], output_dir: str | Path, diff --git a/src/tether/realtime_cert_publish.py b/src/tether/realtime_cert_publish.py new file mode 100644 index 0000000..c17b7ca --- /dev/null +++ b/src/tether/realtime_cert_publish.py @@ -0,0 +1,123 @@ +"""Publish realtime-serving certificates into a comparison latency table. + +Shared orchestration behind ``scripts/publish_jetson_latency.py`` and the +``tether publish-latency`` CLI command — load certificates, render the table +(via :mod:`tether.realtime_cert`), write the results doc, and inject the table +into the README. Pure stdlib + ``tether.realtime_cert``; no GPU/torch needed. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from tether.realtime_cert import ( + format_realtime_certificate_markdown, + format_realtime_certificates_markdown_table, +) + +README_TABLE_BEGIN = "" +README_TABLE_END = "" +DEFAULT_RESULTS_DOC = Path("reflex_context/06_experiments/jetson_latency_results.md") +CERT_FILENAME = "realtime-serving-cert.json" +CERT_KIND = "tether.realtime_serving_certificate" + + +class CertificateLoadError(ValueError): + """Raised when a path is not a readable realtime serving certificate.""" + + +def load_certificate(path: str | Path) -> tuple[Path, dict[str, Any]]: + """Load one certificate from a JSON file or a directory containing it.""" + + resolved = Path(path).expanduser() + if resolved.is_dir(): + resolved = resolved / CERT_FILENAME + if not resolved.is_file(): + raise CertificateLoadError(f"no certificate at {str(path)!r} (looked for {resolved})") + try: + data = json.loads(resolved.read_text()) + except json.JSONDecodeError as exc: + raise CertificateLoadError(f"{resolved}: invalid JSON ({exc})") from exc + if data.get("kind") != CERT_KIND: + raise CertificateLoadError( + f"{resolved}: not a realtime serving certificate (kind={data.get('kind')!r})" + ) + return resolved, data + + +def inject_table_into_readme(readme: str | Path, table: str) -> bool: + """Replace content between the README markers. Returns False if absent.""" + + readme = Path(readme) + if not readme.is_file(): + return False + text = readme.read_text() + if README_TABLE_BEGIN not in text: + return False + pre, rest = text.split(README_TABLE_BEGIN, 1) + if README_TABLE_END not in rest: + return False # END marker missing or precedes BEGIN (malformed) — skip + _, post = rest.split(README_TABLE_END, 1) + new = f"{pre}{README_TABLE_BEGIN}\n\n{table.rstrip()}\n\n{README_TABLE_END}{post}" + if new != text: + readme.write_text(new) + return True + + +def build_results_doc(table: str, sources: list[tuple[Path, dict[str, Any]]]) -> str: + """Render the standalone Jetson latency results document.""" + + parts = [ + "# Jetson latency results", + "", + "Generated by `tether publish-latency` (scripts/publish_jetson_latency.py) " + "from `tether bench realtime` certificates. Roundtrip = end-to-end `/act` " + "p50/p95/p99/max (ms). Re-run to refresh.", + "", + table.rstrip(), + "", + "## Per-certificate detail", + "", + ] + for path, report in sources: + parts.append(f"### `{path}`") + parts.append("") + parts.append(format_realtime_certificate_markdown(report).rstrip()) + parts.append("") + return "\n".join(parts) + "\n" + + +def publish( + cert_paths: list[str | Path], + *, + out: str | Path = DEFAULT_RESULTS_DOC, + readme: str | Path | None = "README.md", + title: str = "Realtime serving latency", +) -> dict[str, Any]: + """Load certs, render the table, write the results doc, inject the README. + + Returns a summary dict ``{table, out, count, readme_updated}``. Raises + :class:`CertificateLoadError` if any path is not a valid certificate. Pass + ``readme=None`` to skip README injection. + """ + + sources = [load_certificate(path) for path in cert_paths] + reports = [report for _, report in sources] + table = format_realtime_certificates_markdown_table(reports, title=title) + + out_path = Path(out) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(build_results_doc(table, sources)) + + readme_updated = False + if readme is not None: + readme_updated = inject_table_into_readme(readme, table) + + return { + "table": table, + "out": str(out_path), + "count": len(sources), + "readme_updated": readme_updated, + } diff --git a/src/tether/release_assurance.py b/src/tether/release_assurance.py new file mode 100644 index 0000000..496d19a --- /dev/null +++ b/src/tether/release_assurance.py @@ -0,0 +1,767 @@ +"""Release assurance packet for robot policy updates. + +This is the customer-facing composition layer over deployment proof, realtime +serving certificates, action-execution checks, policy diff, and shadow rollout +gates. Lower-level commands remain available; this module answers the operator +question directly: promote, hold, or roll back this release? +""" +from __future__ import annotations + +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Literal + +from tether.promote import PromotionError, decide_promotion +from tether.realtime_cert import ( + RealtimeCertificateError, + build_realtime_certificate, + format_realtime_certificate_markdown, + load_deploy_proof, +) + +RELEASE_ASSURANCE_SCHEMA_VERSION = 1 +ReleaseDecision = Literal["PROMOTE", "HOLD", "ROLLBACK"] + + +class ReleaseAssuranceError(ValueError): + """Raised when release assurance input artifacts cannot be loaded.""" + + +def _now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") + + +def _sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as fh: + for chunk in iter(lambda: fh.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def _resolve_packet_dir(packet: str | Path) -> Path: + path = Path(packet).expanduser().resolve() + if path.is_file(): + if path.name != "deployment-proof.json": + raise ReleaseAssuranceError( + "packet file must be deployment-proof.json; pass the packet directory otherwise" + ) + return path.parent + return path + + +def _failed_checks(report: dict[str, Any] | None) -> list[str]: + if not isinstance(report, dict): + return [] + summary = report.get("summary") + if isinstance(summary, dict) and isinstance(summary.get("failed_checks"), list): + return [str(item) for item in summary["failed_checks"]] + return [ + str(check.get("name")) + for check in report.get("checks") or [] + if isinstance(check, dict) and check.get("status") == "fail" + ] + + +def _component( + name: str, + *, + present: bool, + decision: str, + artifact: str = "", + failed_checks: list[str] | None = None, + summary: dict[str, Any] | None = None, +) -> dict[str, Any]: + return { + "name": name, + "present": bool(present), + "decision": decision, + "artifact": artifact, + "failed_checks": failed_checks or [], + "summary": summary or {}, + } + + +def _numeric(value: Any) -> float | None: + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + return float(value) + return None + + +def _add_signal( + signals: list[dict[str, Any]], + name: str, + status: str, + *, + value: Any = None, + threshold: Any = None, + source: str, + remediation: str = "", +) -> None: + row = { + "name": name, + "status": status, + "value": value, + "threshold": threshold, + "source": source, + } + if remediation: + row["remediation"] = remediation + signals.append(row) + + +def _risk_signals( + *, + proof: dict[str, Any], + promotion: dict[str, Any] | None, + realtime: dict[str, Any] | None, + shadow: dict[str, Any] | None, +) -> list[dict[str, Any]]: + signals: list[dict[str, Any]] = [] + + proof_checks = proof.get("checks") or [] + proof_failures = [ + check.get("name") + for check in proof_checks + if isinstance(check, dict) and check.get("status") == "fail" + ] + _add_signal( + signals, + "deployment_proof_failures", + "fail" if proof_failures else "pass", + value=len(proof_failures), + threshold=0, + source="deployment-proof", + remediation="Fix failed deployment-proof checks before release.", + ) + + latency = proof.get("latency") if isinstance(proof.get("latency"), dict) else {} + guard_violations = int(latency.get("guard_violations") or 0) if latency else 0 + _add_signal( + signals, + "guard_violations", + "fail" if guard_violations else "pass", + value=guard_violations, + threshold=0, + source="deployment-proof", + remediation="Inspect ActionGuard violations and clamp causes.", + ) + + policy_summary = ( + (promotion.get("policy_diff") or {}).get("summary") + if isinstance(promotion, dict) + else None + ) + if not isinstance(policy_summary, dict): + proof_policy = proof.get("policy_diff") if isinstance(proof.get("policy_diff"), dict) else {} + embedded = proof_policy.get("report") if isinstance(proof_policy, dict) else None + policy_summary = embedded.get("summary") if isinstance(embedded, dict) else None + if isinstance(policy_summary, dict): + action_failures = int(policy_summary.get("action_failures") or 0) + latency_regressions = int(policy_summary.get("latency_regressions") or 0) + guard_regressions = int(policy_summary.get("guard_regressions") or 0) + shadow_pending = int(policy_summary.get("shadow_pending") or 0) + shadow_errors = int(policy_summary.get("shadow_errors") or 0) + _add_signal( + signals, + "policy_action_delta", + "fail" if action_failures else "pass", + value=action_failures, + threshold=0, + source="policy-diff", + remediation="Compare candidate action chunks and collect more shadow data.", + ) + _add_signal( + signals, + "policy_latency_regression", + "warn" if latency_regressions else "pass", + value=latency_regressions, + threshold=0, + source="policy-diff", + remediation="Profile candidate latency before promotion.", + ) + _add_signal( + signals, + "policy_guard_regression", + "fail" if guard_regressions else "pass", + value=guard_regressions, + threshold=0, + source="policy-diff", + remediation="Do not promote until guard regressions are understood.", + ) + _add_signal( + signals, + "shadow_completion", + "fail" if shadow_errors else ("warn" if shadow_pending else "pass"), + value={"pending": shadow_pending, "errors": shadow_errors}, + threshold={"pending": 0, "errors": 0}, + source="shadow-rollout", + remediation="Wait for shadow_result rows to flush or fix shadow runtime errors.", + ) + else: + _add_signal( + signals, + "policy_diff_present", + "warn", + value=False, + threshold=True, + source="policy-diff", + remediation="Add candidate or shadow policy diff evidence before production rollout.", + ) + + if isinstance(realtime, dict): + for check in realtime.get("checks") or []: + if not isinstance(check, dict): + continue + name = str(check.get("name") or "") + if name in { + "roundtrip_p95_within_budget", + "jitter_within_budget", + "deadline_misses_within_budget", + "control_budget_misses_within_budget", + "action_execution_certificate", + }: + status = str(check.get("status") or "unknown") + _add_signal( + signals, + name, + "fail" if status == "fail" else ("warn" if status == "skip" else "pass"), + value=check.get("actual"), + threshold=check.get("expected"), + source="realtime-certificate", + remediation=str(check.get("remediation") or ""), + ) + + execution = realtime.get("execution_certificate") + if isinstance(execution, dict): + metrics = execution.get("metrics") or {} + stale = metrics.get("stale_action_window_ms") or {} + boundary = metrics.get("chunk_boundary_delta") or {} + velocity = metrics.get("velocity_discontinuity") or {} + thresholds = execution.get("thresholds") or {} + for signal_name, value, limit, remediation in ( + ( + "stale_action_window_ms", + _numeric(stale.get("max_ms")), + _numeric(thresholds.get("max_stale_action_window_ms")), + "Shorten serving latency or increase executed horizon.", + ), + ( + "chunk_boundary_delta", + _numeric(boundary.get("max_abs")), + _numeric(thresholds.get("max_chunk_boundary_delta")), + "Smooth, fuse, or shorten action chunks near boundaries.", + ), + ( + "velocity_discontinuity", + _numeric(velocity.get("max_abs")), + _numeric(thresholds.get("max_velocity_discontinuity")), + "Reduce boundary velocity jumps before release.", + ), + ): + status = "warn" + if value is not None and limit is not None: + status = "pass" if value <= limit else "fail" + _add_signal( + signals, + signal_name, + status, + value=value, + threshold=limit, + source="action-execution-certificate", + remediation=remediation, + ) + else: + _add_signal( + signals, + "realtime_certificate_present", + "warn", + value=False, + threshold=True, + source="realtime-certificate", + remediation="Run release assurance with --realtime or --control-hz for control-loop evidence.", + ) + + if shadow is None: + _add_signal( + signals, + "shadow_rollout_present", + "warn", + value=False, + threshold=True, + source="shadow-rollout", + remediation="Mirror a candidate with --shadow-policy and pass --shadow-trace before fleet rollout.", + ) + + return signals + + +def _gaps( + *, + proof: dict[str, Any], + realtime: dict[str, Any] | None, + shadow: dict[str, Any] | None, +) -> list[dict[str, Any]]: + gaps: list[dict[str, Any]] = [] + + def add(control: str, severity: str, message: str, next_step: str) -> None: + gaps.append( + { + "control": control, + "severity": severity, + "message": message, + "next_step": next_step, + } + ) + + security = proof.get("security") + security = security if isinstance(security, dict) else {} + if not security.get("enabled"): + add( + "runtime_auth", + "warn", + "Proof packet does not show API-key auth enforcement.", + "Run `tether prove` with --api-key for production promotion packets.", + ) + + trace = proof.get("trace") + trace = trace if isinstance(trace, dict) else {} + if not trace.get("files"): + add( + "trace_forensics", + "warn", + "Proof packet has no recorded /act trace files.", + "Run `tether prove` with --record-dir so rollback/debug evidence exists.", + ) + + safety = proof.get("safety_stress") + safety = safety if isinstance(safety, dict) else {} + if not safety.get("enabled"): + add( + "runtime_safety", + "warn", + "ActionGuard stress evidence is missing.", + "Pass --embodiment or --safety-config during proof collection.", + ) + + if realtime is None: + add( + "realtime_serving", + "warn", + "No realtime serving certificate is attached.", + "Run with --realtime and --control-hz for the target robot loop.", + ) + elif not isinstance(realtime.get("execution_certificate"), dict): + add( + "action_execution", + "warn", + "Realtime certificate lacks action-execution continuity evidence.", + "Run with --execution-cert after /act responses include action_execution telemetry.", + ) + + if shadow is None: + add( + "shadow_rollout", + "warn", + "No shadow rollout gate is attached.", + "Pass --shadow-trace collected from `tether serve --shadow-policy --record`.", + ) + + return gaps + + +def _confidence( + *, + components: list[dict[str, Any]], + risk_signals: list[dict[str, Any]], + gaps: list[dict[str, Any]], +) -> int: + score = 100 + for component in components: + if not component.get("present"): + score -= 8 + elif component.get("decision") in {"FAIL", "BLOCK", "HOLD"}: + score -= 25 + elif component.get("decision") == "ROLLBACK": + score -= 40 + for signal in risk_signals: + if signal.get("status") == "fail": + score -= 12 + elif signal.get("status") == "warn": + score -= 4 + for gap in gaps: + score -= 6 if gap.get("severity") == "warn" else 12 + return max(0, min(100, score)) + + +# Risk signals that gate the release decision regardless of the promotion +# profile: zero-tolerance runtime/behavioral safety breaches that the profile +# does not (and should not) make tunable. A failing hard-safety signal forces +# HOLD (or ROLLBACK for an active candidate) even when the promotion gate passed. +_HARD_SAFETY_SIGNALS = frozenset({"guard_violations", "policy_guard_regression"}) + + +def _blocking_safety_signals(risk_signals: list[dict[str, Any]]) -> list[str]: + """Names of failing hard-safety signals that must block promotion.""" + return [ + str(signal.get("name")) + for signal in risk_signals + if signal.get("status") == "fail" and signal.get("name") in _HARD_SAFETY_SIGNALS + ] + + +def _final_decision( + *, + promotion: dict[str, Any], + realtime: dict[str, Any] | None, + shadow: dict[str, Any] | None, + candidate_active: bool, + safety_blocked: bool = False, +) -> ReleaseDecision: + blocking = safety_blocked + if promotion.get("decision") != "PROMOTE": + blocking = True + if realtime is not None and realtime.get("decision") != "PASS": + blocking = True + if shadow is not None and shadow.get("decision") != "PROMOTE": + blocking = True + if not blocking: + return "PROMOTE" + if candidate_active or promotion.get("decision") == "ROLLBACK" or ( + shadow is not None and shadow.get("decision") == "ROLLBACK" + ): + return "ROLLBACK" + return "HOLD" + + +def build_release_assurance( + *, + packet: str | Path, + profile_path: str | Path | None = None, + candidate_active: bool = False, + realtime: bool = False, + target: str = "", + control_hz: float | None = None, + max_roundtrip_p95_ms: float | None = None, + max_jitter_p95_minus_p50_ms: float | None = None, + max_deadline_misses: int = 0, + max_control_budget_misses: int = 0, + max_act_errors: int = 0, + execution_cert: bool = False, + max_stale_action_window_ms: float = 100.0, + max_chunk_boundary_delta: float = 0.15, + max_velocity_discontinuity: float = 0.2, + require_phase_aware_horizon: bool = False, + require_runtime_attribution: bool = True, + shadow_trace: str | Path | None = None, + shadow_min_compared: int = 1, + shadow_wait_timeout_s: float = 0.0, + shadow_poll_s: float = 0.25, + shadow_fail_on: str = "any", + shadow_min_action_cos: float = 0.995, + shadow_max_action_delta: float = 0.10, + shadow_max_latency_regression_pct: float = 0.10, +) -> dict[str, Any]: + """Build one release assurance report from existing release evidence.""" + + packet_dir = _resolve_packet_dir(packet) + proof_path = packet_dir / "deployment-proof.json" + try: + proof = load_deploy_proof(proof_path) + except RealtimeCertificateError as exc: + raise ReleaseAssuranceError(str(exc)) from exc + + try: + shadow_report = None + effective_profile_path = profile_path or ("lab-shadow" if shadow_trace else None) + if shadow_trace: + from tether.shadow_rollout import run_shadow_rollout_gate + + shadow_report = run_shadow_rollout_gate( + trace=shadow_trace, + packet_dir=packet_dir, + profile=effective_profile_path or "lab-shadow", + candidate_active=candidate_active, + min_compared=shadow_min_compared, + wait_timeout_s=shadow_wait_timeout_s, + poll_s=shadow_poll_s, + fail_on=shadow_fail_on, + min_action_cos=shadow_min_action_cos, + max_action_delta=shadow_max_action_delta, + max_latency_regression_pct=shadow_max_latency_regression_pct, + use_existing_packet=True, + ) + promotion = decide_promotion( + packet_dir, + profile_path=effective_profile_path, + candidate_active=candidate_active, + ) + except (PromotionError, ValueError) as exc: + raise ReleaseAssuranceError(str(exc)) from exc + + realtime_report = None + realtime_requested = bool( + realtime + or execution_cert + or target + or (control_hz is not None and control_hz > 0) + or max_roundtrip_p95_ms is not None + or max_jitter_p95_minus_p50_ms is not None + ) + if realtime_requested: + try: + realtime_report = build_realtime_certificate( + proof, + target=target, + control_hz=control_hz, + max_roundtrip_p95_ms=max_roundtrip_p95_ms, + max_jitter_p95_minus_p50_ms=max_jitter_p95_minus_p50_ms, + max_deadline_misses=max_deadline_misses, + max_control_budget_misses=max_control_budget_misses, + max_act_errors=max_act_errors, + execution_cert=execution_cert, + max_stale_action_window_ms=max_stale_action_window_ms, + max_chunk_boundary_delta=max_chunk_boundary_delta, + max_velocity_discontinuity=max_velocity_discontinuity, + require_phase_aware_horizon=require_phase_aware_horizon, + require_runtime_attribution=require_runtime_attribution, + ) + except RealtimeCertificateError as exc: + raise ReleaseAssuranceError(str(exc)) from exc + + components = [ + _component( + "deployment_proof", + present=True, + decision="PASS" if proof.get("passed") else "FAIL", + artifact=str(proof_path), + failed_checks=_failed_checks(proof), + summary={"passed": bool(proof.get("passed"))}, + ), + _component( + "promotion_gate", + present=True, + decision=str(promotion.get("decision") or "BLOCK"), + artifact=str(packet_dir / "promotion-decision.json") + if (packet_dir / "promotion-decision.json").exists() + else "", + failed_checks=_failed_checks(promotion), + summary=promotion.get("summary") if isinstance(promotion.get("summary"), dict) else {}, + ), + _component( + "realtime_certificate", + present=realtime_report is not None, + decision=str((realtime_report or {}).get("decision") or "NOT_RUN"), + artifact="", + failed_checks=_failed_checks(realtime_report), + summary=(realtime_report or {}).get("summary") + if isinstance((realtime_report or {}).get("summary"), dict) + else {}, + ), + _component( + "shadow_rollout", + present=shadow_report is not None, + decision=str((shadow_report or {}).get("decision") or "NOT_RUN"), + artifact=str((shadow_report or {}).get("packet_dir") or ""), + failed_checks=list(((shadow_report or {}).get("promotion") or {}).get("failed_checks") or []), + summary=((shadow_report or {}).get("policy_diff") or {}).get("summary") + if isinstance(((shadow_report or {}).get("policy_diff") or {}).get("summary"), dict) + else {}, + ), + ] + + risk_signals = _risk_signals( + proof=proof, + promotion=promotion, + realtime=realtime_report, + shadow=shadow_report, + ) + gaps = _gaps(proof=proof, realtime=realtime_report, shadow=shadow_report) + blocking_signals = _blocking_safety_signals(risk_signals) + decision = _final_decision( + promotion=promotion, + realtime=realtime_report, + shadow=shadow_report, + candidate_active=candidate_active, + safety_blocked=bool(blocking_signals), + ) + confidence = _confidence(components=components, risk_signals=risk_signals, gaps=gaps) + + failed_components = [ + component["name"] + for component in components + if component["present"] + and component["decision"] not in {"PASS", "PROMOTE"} + ] + # blocked_by must agree with the decision: empty when we PROMOTE (so a failing but + # profile-permitted signal no longer contradicts a PROMOTE verdict), and the full + # set of failing signals when the release is held/rolled back. Hard-safety signals + # (blocking_signals) independently force a non-PROMOTE decision above. + failed_signals = ( + [signal["name"] for signal in risk_signals if signal.get("status") == "fail"] + if decision != "PROMOTE" + else [] + ) + + return { + "schema_version": RELEASE_ASSURANCE_SCHEMA_VERSION, + "kind": "tether.release_assurance", + "generated_at": _now_iso(), + "decision": decision, + "passed": decision == "PROMOTE", + "candidate_active": bool(candidate_active), + "confidence": confidence, + "packet_dir": str(packet_dir), + "profile": promotion.get("profile") or {}, + "source": { + "deployment_proof": str(proof_path), + "shadow_trace": str(shadow_trace) if shadow_trace else "", + }, + "components": components, + "risk_signals": risk_signals, + "gaps": gaps, + "blocked_by": { + "components": failed_components, + "signals": failed_signals, + }, + "promotion": promotion, + "realtime_certificate": realtime_report, + "shadow_rollout": shadow_report, + } + + +def format_release_assurance_human(report: dict[str, Any]) -> str: + lines = [ + f"tether release assure - {report.get('decision')}", + f"packet: {report.get('packet_dir')}", + f"profile: {(report.get('profile') or {}).get('name', 'default')}", + f"confidence: {report.get('confidence', 0)}/100", + ] + lines.append("components:") + for component in report.get("components") or []: + lines.append( + " - " + f"{component.get('name')}: {component.get('decision')}" + f"{'' if component.get('present') else ' (not run)'}" + ) + blocked = report.get("blocked_by") or {} + blockers = list(blocked.get("components") or []) + list(blocked.get("signals") or []) + if blockers: + lines.append("blocked by:") + for item in blockers[:12]: + lines.append(f" - {item}") + gaps = report.get("gaps") or [] + if gaps: + lines.append("open evidence gaps:") + for gap in gaps[:8]: + lines.append(f" - {gap.get('control')}: {gap.get('message')}") + return "\n".join(lines) + + +def format_release_assurance_markdown(report: dict[str, Any]) -> str: + lines = [ + "# Tether Release Assurance", + "", + f"- Decision: **{report.get('decision')}**", + f"- Confidence: `{report.get('confidence', 0)}/100`", + f"- Packet: `{report.get('packet_dir')}`", + f"- Profile: `{(report.get('profile') or {}).get('name', 'default')}`", + "", + "## Components", + "", + "| Component | Present | Decision | Failed checks |", + "|---|---:|---:|---|", + ] + for component in report.get("components") or []: + failed = ", ".join(component.get("failed_checks") or []) + lines.append( + f"| `{component.get('name')}` | {bool(component.get('present'))} | " + f"`{component.get('decision')}` | {failed or '-'} |" + ) + + lines.extend([ + "", + "## Risk Signals", + "", + "| Signal | Status | Value | Threshold | Source |", + "|---|---:|---:|---:|---|", + ]) + for signal in report.get("risk_signals") or []: + lines.append( + f"| `{signal.get('name')}` | `{signal.get('status')}` | " + f"{signal.get('value')} | {signal.get('threshold')} | {signal.get('source')} |" + ) + + gaps = report.get("gaps") or [] + lines.extend(["", "## Evidence Gaps", ""]) + if not gaps: + lines.append("No open evidence gaps recorded.") + else: + for gap in gaps: + lines.extend([ + f"### {gap.get('control')}", + "", + f"- Severity: `{gap.get('severity')}`", + f"- Gap: {gap.get('message')}", + f"- Next step: `{gap.get('next_step')}`", + "", + ]) + + realtime = report.get("realtime_certificate") + if isinstance(realtime, dict): + lines.extend(["", format_realtime_certificate_markdown(realtime).rstrip(), ""]) + + return "\n".join(lines).rstrip() + "\n" + + +def write_release_assurance_packet( + report: dict[str, Any], + output_dir: str | Path, +) -> dict[str, Any]: + out = Path(output_dir).expanduser().resolve() + packet_dir = report.get("packet_dir") + if packet_dir and out == Path(str(packet_dir)).expanduser().resolve(): + raise ReleaseAssuranceError( + "--output-dir must be separate from the input proof packet directory " + "so release-assurance artifacts do not overwrite proof MANIFEST.json" + ) + out.mkdir(parents=True, exist_ok=True) + json_path = out / "release-assurance.json" + md_path = out / "release-assurance.md" + json_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") + md_path.write_text(format_release_assurance_markdown(report), encoding="utf-8") + + files = [] + for path in (json_path, md_path): + files.append( + { + "name": path.name, + "size_bytes": path.stat().st_size, + "sha256": _sha256_file(path), + } + ) + manifest = { + "kind": "tether.release_assurance_manifest", + "schema_version": 1, + "generated_at": _now_iso(), + "files": files, + } + (out / "MANIFEST.json").write_text( + json.dumps(manifest, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + return manifest + + +__all__ = [ + "RELEASE_ASSURANCE_SCHEMA_VERSION", + "ReleaseAssuranceError", + "build_release_assurance", + "format_release_assurance_human", + "format_release_assurance_markdown", + "write_release_assurance_packet", +] diff --git a/src/tether/shadow_rollout.py b/src/tether/shadow_rollout.py index efe5a73..7419970 100644 --- a/src/tether/shadow_rollout.py +++ b/src/tether/shadow_rollout.py @@ -72,7 +72,7 @@ def _write_synthetic_proof_packet( policy_diff: dict[str, Any], policy_diff_fail_on: str, ) -> None: - checks = [ + checks: list[dict[str, Any]] = [ { "name": "shadow_trace_present", "status": "pass", @@ -138,6 +138,40 @@ def _write_synthetic_proof_packet( ) +_DERIVED_PACKET_OUTPUTS = { + "MANIFEST.json", + "promotion-decision.json", + "release-assurance.json", + "release-assurance.md", +} + + +def _refresh_packet_manifest(packet_dir: Path) -> None: + files = [] + for path in sorted( + p + for p in packet_dir.iterdir() + if p.is_file() and p.name not in _DERIVED_PACKET_OUTPUTS + ): + files.append( + { + "name": path.name, + "size_bytes": path.stat().st_size, + "sha256": _sha256_file(path), + } + ) + manifest = { + "kind": "tether.manifest", + "schema_version": 1, + "generated_at": _now_iso(), + "files": files, + } + (packet_dir / "MANIFEST.json").write_text( + json.dumps(manifest, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + + def run_shadow_rollout_gate( *, trace: str | Path, @@ -185,6 +219,7 @@ def run_shadow_rollout_gate( raise ShadowRolloutError( f"--use-existing-packet set but {proof_path} does not exist" ) + _refresh_packet_manifest(packet_path) else: _write_synthetic_proof_packet( packet_path, diff --git a/tests/test_chat_regression.py b/tests/test_chat_regression.py index c6f6721..2e9511e 100644 --- a/tests/test_chat_regression.py +++ b/tests/test_chat_regression.py @@ -145,6 +145,44 @@ def test_decide_promotion_tool_routes_to_promote() -> None: ] +def test_assure_release_tool_routes_to_release_assure() -> None: + tool = by_name()["assure_release"] + props = tool["function"]["parameters"]["properties"] + assert "packet" in props + assert "control_hz" in props + assert "shadow_trace" in props + assert "execution_cert" in props + + argv = _argv_for( + "assure_release", + { + "packet": "./proof", + "profile": "warehouse-safe", + "candidate_active": True, + "control_hz": 20, + "target": "agx-orin-cell-a", + "execution_cert": True, + "shadow_trace": "./shadow.jsonl.gz", + "min_compared": 100, + "output_dir": "./release", + "json": True, + }, + ) + + assert argv == [ + "release", "assure", "./proof", + "--profile", "warehouse-safe", + "--candidate-active", + "--control-hz", "20", + "--target", "agx-orin-cell-a", + "--execution-cert", + "--shadow-trace", "./shadow.jsonl.gz", + "--min-compared", "100", + "--output-dir", "./release", + "--json", + ] + + def test_certify_realtime_serving_routes_to_bench_realtime() -> None: tool = by_name()["certify_realtime_serving"] props = tool["function"]["parameters"]["properties"] @@ -338,9 +376,11 @@ def test_system_prompt_prefers_realtime_cert_for_control_budget() -> None: p = SYSTEM_PROMPT assert "certify_realtime_serving" in p assert "prove_realtime_deployment" in p + assert "assure_release" in p assert "20 Hz" in p assert "control-loop budget" in p - assert "Use certify_realtime_serving only when the user gives an existing proof packet" in p + assert "Use assure_release when the user gives an existing proof packet" in p + assert "Use certify_realtime_serving only when the user asks specifically" in p def test_system_prompt_prefers_policy_diff_for_rollout_questions() -> None: @@ -348,5 +388,5 @@ def test_system_prompt_prefers_policy_diff_for_rollout_questions() -> None: assert "diff_policies" in p assert "decide_promotion" in p assert "list_promotion_profiles" in p - assert "promote, block, or roll back" in p + assert "promote, hold, or roll back" in p assert "safe to promote" in p diff --git a/tests/test_cli.py b/tests/test_cli.py index 29717aa..bfdaa07 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -329,6 +329,126 @@ def test_rollout_gate_help(): assert "--min-compared" in result.output +def _write_cli_release_proof(tmp_path): + from tether.deploy_proof import write_deploy_proof_packet + + receipt = { + "schema_version": 1, + "kind": "tether.deployment_proof", + "passed": True, + "export_dir": str(tmp_path / "export"), + "output_dir": str(tmp_path / "proof"), + "profile": {"name": "ci", "thresholds": {}}, + "server": {"log_tail": []}, + "security": {"enabled": True}, + "safety_stress": { + "enabled": True, + "source": "embodiment", + "checks": [{"name": "guard_importable", "status": "pass"}], + }, + "trace": {"record_dir": str(tmp_path / "traces"), "files": [{"path": "trace.jsonl"}]}, + "checks": [{"name": "server_health_ready", "status": "pass"}], + "act_samples": [ + { + "roundtrip_ms": 40.0, + "actions": [[0.0, 0.0], [0.04, 0.02], [0.08, 0.04]], + "action_execution": { + "executed_horizon": 3, + "adaptive_reason": "low_speed_transition", + "phase_transition_indices": [2], + "cache_status": "rtc_carry_hit", + }, + }, + { + "roundtrip_ms": 40.0, + "actions": [[0.09, 0.04], [0.13, 0.06], [0.17, 0.08]], + "action_execution": { + "executed_horizon": 3, + "adaptive_reason": "low_speed_transition", + "phase_transition_indices": [2], + "cache_status": "rtc_carry_hit", + }, + }, + ], + "latency": { + "samples": 2, + "roundtrip_ms": { + "p50_ms": 40.0, + "p95_ms": 40.0, + "p99_ms": 40.0, + "max_ms": 40.0, + }, + "warm_roundtrip_ms": { + "p50_ms": 40.0, + "p95_ms": 40.0, + "p99_ms": 40.0, + "max_ms": 40.0, + }, + "jitter": {"p95_minus_p50_ms": 0.0}, + "control_budget": {"control_hz": 20.0, "period_ms": 50.0, "missed_samples": 0}, + "deadline_misses": 0, + "act_errors": 0, + "guard_violations": 0, + }, + "policy_diff": { + "enabled": True, + "report": { + "kind": "tether.policy_diff", + "summary": { + "verdict": "pass", + "action_failures": 0, + "latency_regressions": 0, + "guard_regressions": 0, + "shape_failures": 0, + "missing_candidate": 0, + "shadow_pending": 0, + "shadow_errors": 0, + }, + }, + }, + "export_manifest": {"files": [{"name": "model.onnx", "sha256": "abc"}]}, + } + proof_dir = tmp_path / "proof" + write_deploy_proof_packet(receipt, proof_dir) + return proof_dir + + +def test_release_assure_help(): + result = runner.invoke(app, ["release", "assure", "--help"]) + assert result.exit_code == 0 + assert "release assurance" in result.output.lower() + assert "--execution-cert" in result.output + + +def test_release_assure_json_from_packet(tmp_path): + proof_dir = _write_cli_release_proof(tmp_path) + out_dir = tmp_path / "release" + + result = runner.invoke( + app, + [ + "release", + "assure", + str(proof_dir), + "--control-hz", + "20", + "--execution-cert", + "--require-phase-aware-horizon", + "--output-dir", + str(out_dir), + "--json", + ], + ) + + assert result.exit_code == 0, result.output + body = json.loads(result.output) + assert body["kind"] == "tether.release_assurance" + assert body["decision"] == "PROMOTE" + assert body["realtime_certificate"]["execution_certificate"]["decision"] == "PASS" + assert (out_dir / "release-assurance.json").exists() + assert (out_dir / "release-assurance.md").exists() + + def test_policy_diff_fail_on_any_exits_three(monkeypatch): import tether.policy_diff as policy_diff_mod diff --git a/tests/test_cli_export_end_to_end.py b/tests/test_cli_export_end_to_end.py index 27cd946..ee0b34a 100644 --- a/tests/test_cli_export_end_to_end.py +++ b/tests/test_cli_export_end_to_end.py @@ -17,6 +17,10 @@ """ from __future__ import annotations +import logging +import sys +import types + import pytest @@ -63,3 +67,47 @@ def test_dep_check_catches_wrong_transformers(monkeypatch): monkeypatch.setattr(transformers, "__version__", "4.99.0") with pytest.raises(ImportError, match="5.3.0"): monolithic._require_monolithic_deps() + + +def test_dep_check_catches_wrong_lerobot(monkeypatch): + """_require_monolithic_deps() must fail early on the known-bad + lerobot 0.4.x stack instead of letting torch.export fail downstream.""" + from tether.exporters import monolithic + import transformers + + monkeypatch.setattr(transformers, "__version__", "5.3.0") + for mod_name in ("lerobot", "onnx_diagnostic", "onnxscript", "optree", "scipy"): + monkeypatch.setitem(sys.modules, mod_name, types.ModuleType(mod_name)) + + def _fake_dist_version(dist_name: str) -> str: + if dist_name == "lerobot": + return "0.4.4" + raise monolithic.PackageNotFoundError(dist_name) + + monkeypatch.setattr(monolithic, "_dist_version", _fake_dist_version) + + with pytest.raises(ImportError, match=r"lerobot 0\.4\.4.*lerobot==0\.5\.1"): + monolithic._require_monolithic_deps() + + +def test_smolvla_export_patch_failure_is_warning_then_fatal(caplog): + """SmolVLA-specific export patches should not hide at debug level. + + A failed patch is logged immediately and converted into a clear RuntimeError + before SmolVLA torch.export can hit a cryptic FakeTensor shape error. + """ + from tether.exporters import monolithic + + monolithic._SMOLVLA_EXPORT_PATCH_FAILURES.clear() + try: + with caplog.at_level(logging.WARNING, logger="tether.exporters.monolithic"): + monolithic._record_smolvla_export_patch_failure( + "SmolVLA explicit patch-mask export patch", + RuntimeError("boom"), + ) + + assert any("export patch failed" in rec.message for rec in caplog.records) + with pytest.raises(RuntimeError, match="SmolVLA monolithic export patches failed"): + monolithic._raise_if_smolvla_export_patches_failed() + finally: + monolithic._SMOLVLA_EXPORT_PATCH_FAILURES.clear() diff --git a/tests/test_realtime_cert_publish.py b/tests/test_realtime_cert_publish.py new file mode 100644 index 0000000..38d0369 --- /dev/null +++ b/tests/test_realtime_cert_publish.py @@ -0,0 +1,185 @@ +"""Tests for the realtime-certificate latency-table publisher.""" + +from __future__ import annotations + +import json + +import pytest + +from tether.realtime_cert import ( + build_realtime_certificate, + format_realtime_certificates_markdown_table, + write_realtime_certificate, +) +from tether.realtime_cert_publish import ( + CertificateLoadError, + inject_table_into_readme, + load_certificate, + publish, +) + + +def _receipt(export_dir: str, *, p95: float, hz: int = 20) -> dict: + """A deployment-proof receipt with controllable p95 + export dir.""" + + return { + "schema_version": 1, + "kind": "tether.deployment_proof", + "passed": True, + "export_dir": export_dir, + "profile": { + "name": "warehouse-safe", + "thresholds": {"control_hz": hz, "max_jitter_p95_minus_p50_ms": 50}, + }, + "act_samples": [ + {"roundtrip_ms": 10.0}, + {"roundtrip_ms": 20.0}, + {"roundtrip_ms": p95}, + ], + "latency": { + "samples": 3, + "roundtrip_ms": {"p50_ms": 20.0, "p95_ms": p95, "p99_ms": p95, "max_ms": p95}, + "jitter": {"p95_minus_p50_ms": 5.0}, + "control_budget": { + "control_hz": float(hz), + "period_ms": 1000.0 / hz, + "missed_samples": 0, + }, + "deadline_misses": 0, + "act_errors": 0, + }, + } + + +def _write_cert(tmp_path, name, export_dir, p95, target="orin-nano"): + report = build_realtime_certificate(_receipt(export_dir, p95=p95), target=target) + out = tmp_path / name + write_realtime_certificate(report, out) + return out + + +# ── table formatter ────────────────────────────────────────────────────────── + + +def test_table_has_one_row_per_certificate(): + fast = build_realtime_certificate( + _receipt("/c/exports/smolvla-base", p95=25.0), target="orin-nano" + ) + slow = build_realtime_certificate( + _receipt("/c/exports/smolvla-libero", p95=80.0), target="orin-nano" + ) + table = format_realtime_certificates_markdown_table([fast, slow]) + + assert "## Realtime serving latency" in table + assert "`smolvla-base`" in table + assert "`smolvla-libero`" in table + assert table.count("`orin-nano`") == 2 + assert "25.0" in table + assert "80.0" in table + + +def test_decision_column_reflects_control_budget(): + # 25ms p95 under the 50ms (20Hz) budget -> PASS; 999ms over -> FAIL. + fast = build_realtime_certificate( + _receipt("/c/exports/smolvla-base", p95=25.0), target="orin-nano" + ) + slow = build_realtime_certificate( + _receipt("/c/exports/pi05-student", p95=999.0), target="orin-nano" + ) + table = format_realtime_certificates_markdown_table([fast, slow]) + rows = { + line.split("|")[1].strip(): line + for line in table.splitlines() + if line.startswith("| `") + } + assert "**PASS**" in rows["`smolvla-base`"] + assert "**FAIL**" in rows["`pi05-student`"] + + +def test_missing_metrics_render_placeholders(): + report = { + "kind": "tether.realtime_serving_certificate", + "decision": "FAIL", + "target": "", + "source": {}, + "control_budget": {}, + "latency": {}, + } + table = format_realtime_certificates_markdown_table([report]) + assert "—" in table # missing latency metrics -> em dash + assert "unknown" in table # model label falls back to "unknown" + + +# ── load_certificate ───────────────────────────────────────────────────────── + + +def test_load_certificate_from_dir_or_file(tmp_path): + cert_dir = _write_cert(tmp_path, "certA", "/c/exports/smolvla-base", 12.0) + + _, from_dir = load_certificate(cert_dir) + assert from_dir["kind"] == "tether.realtime_serving_certificate" + + _, from_file = load_certificate(cert_dir / "realtime-serving-cert.json") + assert from_file == from_dir + + +def test_load_certificate_rejects_non_certificate(tmp_path): + bad = tmp_path / "bad.json" + bad.write_text(json.dumps({"kind": "tether.deployment_proof"})) + with pytest.raises(CertificateLoadError): + load_certificate(bad) + + with pytest.raises(CertificateLoadError): + load_certificate(tmp_path / "does-not-exist") + + +# ── publish orchestrator ───────────────────────────────────────────────────── + + +def test_publish_writes_doc_and_injects_readme(tmp_path): + a = _write_cert(tmp_path, "certA", "/c/exports/smolvla-base", 12.0) + b = _write_cert(tmp_path, "certB", "/c/exports/smolvla-libero", 14.0) + readme = tmp_path / "README.md" + readme.write_text( + "intro\n\n\n" + "\n\noutro\n" + ) + out = tmp_path / "results.md" + + result = publish([a, b], out=out, readme=readme) + + assert result["count"] == 2 + assert result["readme_updated"] is True + doc = out.read_text() + assert "## Realtime serving latency" in doc + assert "`smolvla-base`" in doc and "`smolvla-libero`" in doc + assert "## Per-certificate detail" in doc + + injected = readme.read_text() + assert "`smolvla-base`" in injected + assert injected.startswith("intro") and injected.rstrip().endswith("outro") + + # idempotent: a second run leaves the README byte-identical + publish([a, b], out=out, readme=readme) + assert readme.read_text() == injected + + +def test_publish_without_readme(tmp_path): + a = _write_cert(tmp_path, "certA", "/c/exports/smolvla-base", 12.0) + out = tmp_path / "results.md" + + result = publish([a], out=out, readme=None) + + assert result["readme_updated"] is False + assert out.exists() + + +def test_inject_skips_malformed_markers(tmp_path): + # END marker before BEGIN (out of order) must return False, not raise. + readme = tmp_path / "README.md" + readme.write_text( + "x\n\n\ny\n" + ) + before = readme.read_text() + assert inject_table_into_readme(readme, "| t |") is False + assert readme.read_text() == before # left untouched diff --git a/tests/test_release_assurance.py b/tests/test_release_assurance.py new file mode 100644 index 0000000..5fe1026 --- /dev/null +++ b/tests/test_release_assurance.py @@ -0,0 +1,262 @@ +"""Tests for the composed release assurance packet.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from tether.deploy_proof import write_deploy_proof_packet +from tether.release_assurance import ( + ReleaseAssuranceError, + build_release_assurance, + format_release_assurance_markdown, + write_release_assurance_packet, +) +from tether.runtime.record import RecordWriter + + +def _receipt(tmp_path: Path, *, roundtrip_p95: float = 40.0) -> dict: + return { + "schema_version": 1, + "kind": "tether.deployment_proof", + "timestamp": "2026-06-20T00:00:00.000Z", + "passed": True, + "export_dir": str(tmp_path / "export"), + "output_dir": str(tmp_path / "proof"), + "profile": {"name": "ci", "thresholds": {}}, + "server": {"log_tail": []}, + "security": {"enabled": True}, + "safety_stress": { + "enabled": True, + "source": "embodiment", + "checks": [{"name": "guard_importable", "status": "pass"}], + }, + "trace": { + "record_dir": str(tmp_path / "traces"), + "files": [{"path": str(tmp_path / "traces" / "trace.jsonl"), "size_bytes": 10}], + }, + "checks": [{"name": "server_health_ready", "status": "pass"}], + "act_samples": [ + { + "sample": 1, + "roundtrip_ms": 40.0, + "actions": [[0.0, 0.0], [0.04, 0.02], [0.08, 0.04]], + "action_execution": { + "executed_horizon": 3, + "adaptive_reason": "low_speed_transition", + "phase_transition_indices": [2], + "cache_status": "rtc_carry_hit", + }, + }, + { + "sample": 2, + "roundtrip_ms": 40.0, + "actions": [[0.09, 0.04], [0.13, 0.06], [0.17, 0.08]], + "action_execution": { + "executed_horizon": 3, + "adaptive_reason": "low_speed_transition", + "phase_transition_indices": [2], + "cache_status": "rtc_carry_hit", + }, + }, + ], + "latency": { + "samples": 2, + "roundtrip_ms": { + "p50_ms": 40.0, + "p95_ms": roundtrip_p95, + "p99_ms": roundtrip_p95, + "max_ms": roundtrip_p95, + }, + "warm_roundtrip_ms": { + "p50_ms": 40.0, + "p95_ms": roundtrip_p95, + "p99_ms": roundtrip_p95, + "max_ms": roundtrip_p95, + }, + "jitter": {"p95_minus_p50_ms": 0.0}, + "control_budget": {"control_hz": 20.0, "period_ms": 50.0, "missed_samples": 0}, + "deadline_misses": 0, + "act_errors": 0, + "guard_violations": 0, + }, + "policy_diff": { + "enabled": True, + "report": { + "kind": "tether.policy_diff", + "summary": { + "verdict": "pass", + "action_failures": 0, + "latency_regressions": 0, + "guard_regressions": 0, + "shape_failures": 0, + "missing_candidate": 0, + "shadow_pending": 0, + "shadow_errors": 0, + }, + }, + }, + "export_manifest": {"files": [{"name": "model.onnx", "sha256": "abc"}]}, + } + + +def _packet(tmp_path: Path, *, roundtrip_p95: float = 40.0) -> Path: + packet = tmp_path / "proof" + write_deploy_proof_packet(_receipt(tmp_path, roundtrip_p95=roundtrip_p95), packet) + return packet + + +def _shadow_trace(tmp_path: Path) -> Path: + writer = RecordWriter( + record_dir=tmp_path / "shadow_trace", + model_hash="deadbeefcafe0000", + config_hash="0011223344556677", + export_dir=str(tmp_path / "fake_export"), + model_type="pi0.5", + export_kind="monolithic", + providers=["CPUExecutionProvider"], + gzip_output=False, + ) + seq = writer.write_request( + chunk_id=0, + image_b64="aGVsbG8=", + instruction="pick", + state=[0.1, 0.2], + actions=[[0.1, 0.2]], + action_dim=2, + latency_total_ms=100.0, + routing={ + "shadow_sampled": True, + "shadow_mode": "background", + "shadow_pending": False, + }, + ) + writer.write_shadow_result( + seq=seq, + actions=[[0.11, 0.21]], + action_dim=2, + latency_total_ms=12.0, + routing={ + "shadow_sampled": True, + "shadow_mode": "background", + "shadow_actions": [[0.11, 0.21]], + "shadow_latency_ms": 12.0, + }, + ) + writer.write_footer({"total_requests": 1}) + writer.close() + return writer.filepath + + +def test_release_assurance_promotes_with_realtime_execution_cert(tmp_path: Path) -> None: + packet = _packet(tmp_path) + + report = build_release_assurance( + packet=packet, + realtime=True, + control_hz=20.0, + execution_cert=True, + require_phase_aware_horizon=True, + ) + + assert report["kind"] == "tether.release_assurance" + assert report["decision"] == "PROMOTE" + assert report["realtime_certificate"]["decision"] == "PASS" + assert report["blocked_by"] == {"components": [], "signals": []} + assert any(signal["name"] == "chunk_boundary_delta" for signal in report["risk_signals"]) + + +def test_release_assurance_holds_on_realtime_failure(tmp_path: Path) -> None: + packet = _packet(tmp_path, roundtrip_p95=80.0) + + report = build_release_assurance( + packet=packet, + realtime=True, + control_hz=20.0, + ) + + assert report["decision"] == "HOLD" + assert "realtime_certificate" in report["blocked_by"]["components"] + assert "roundtrip_p95_within_budget" in report["blocked_by"]["signals"] + + +def test_release_assurance_rolls_back_active_candidate_on_failure(tmp_path: Path) -> None: + packet = _packet(tmp_path, roundtrip_p95=80.0) + + report = build_release_assurance( + packet=packet, + realtime=True, + control_hz=20.0, + candidate_active=True, + ) + + assert report["decision"] == "ROLLBACK" + + +def test_release_assurance_shadow_trace_defaults_to_lab_profile(tmp_path: Path) -> None: + packet = _packet(tmp_path) + (packet / "promotion-decision.json").write_text('{"stale": true}\n', encoding="utf-8") + trace = _shadow_trace(tmp_path) + + report = build_release_assurance(packet=packet, shadow_trace=trace) + + assert report["decision"] == "PROMOTE" + assert report["profile"]["name"] == "lab-shadow" + assert report["shadow_rollout"]["profile"] == "lab-shadow" + + +def test_write_release_assurance_packet(tmp_path: Path) -> None: + packet = _packet(tmp_path) + report = build_release_assurance(packet=packet) + out = tmp_path / "release" + + manifest = write_release_assurance_packet(report, out) + + assert (out / "release-assurance.json").exists() + assert (out / "release-assurance.md").exists() + assert (out / "MANIFEST.json").exists() + assert {row["name"] for row in manifest["files"]} == { + "release-assurance.json", + "release-assurance.md", + } + body = json.loads((out / "release-assurance.json").read_text()) + assert body["kind"] == "tether.release_assurance" + assert "# Tether Release Assurance" in format_release_assurance_markdown(report) + + +def test_release_assurance_packet_refuses_to_overwrite_proof_manifest(tmp_path: Path) -> None: + packet = _packet(tmp_path) + report = build_release_assurance(packet=packet) + + with pytest.raises(ReleaseAssuranceError, match="separate from the input proof"): + write_release_assurance_packet(report, packet) + + +def test_guard_violations_block_promotion() -> None: + # Regression: a runtime ActionGuard clamp (guard_violations > 0) is a hard-safety + # signal that must prevent PROMOTE even when the promotion gate passed — instead of + # being listed in blocked_by while the verdict stays PROMOTE (the prior bug). + from tether.release_assurance import _blocking_safety_signals, _final_decision + + clamping = [{"name": "guard_violations", "status": "fail"}] + clean = [{"name": "guard_violations", "status": "pass"}] + promote = {"decision": "PROMOTE"} + + assert _blocking_safety_signals(clamping) == ["guard_violations"] + assert _blocking_safety_signals(clean) == [] + # Passing promotion gate + clamping -> HOLD; active candidate clamping -> ROLLBACK. + assert _final_decision( + promotion=promote, realtime=None, shadow=None, + candidate_active=False, safety_blocked=True, + ) == "HOLD" + assert _final_decision( + promotion=promote, realtime=None, shadow=None, + candidate_active=True, safety_blocked=True, + ) == "ROLLBACK" + # No hard-safety signal -> the promotion gate decides (PROMOTE). + assert _final_decision( + promotion=promote, realtime=None, shadow=None, + candidate_active=False, safety_blocked=False, + ) == "PROMOTE"