diff --git a/README.md b/README.md index b9545d4..306e536 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ star helps others find it. ## Status -42 runnable examples · 38 README GIFs · 118 smoke / regression tests · +43 runnable examples · 38 README GIFs · 120 smoke / regression tests · 5 Gymnasium-style adapters · CI green on Python 3.10, 3.11, and 3.12. See `docs/status.md` for the implementation snapshot, `docs/plan.md` for the diff --git a/docs/status.md b/docs/status.md index feba38c..1eda2bb 100644 --- a/docs/status.md +++ b/docs/status.md @@ -5,10 +5,10 @@ see what exists, what is verified, and what should come next. ## Snapshot -- Runnable examples: 42 +- Runnable examples: 43 - Learning-path roadmap examples: 20 - README GIFs: 38 -- Smoke and regression tests: 118 (105 example/adapter/static + 13 planning) +- Smoke and regression tests: 120 (107 example/adapter/static + 13 planning) - Colab notebooks: 5 - Core dependencies: `numpy`, `matplotlib` - Contributor extra: `pip install -e ".[dev]"` diff --git a/examples/README.md b/examples/README.md index 10620bc..369ed0b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -31,6 +31,7 @@ Run any example headless with its `--no-render` flag when available. | `navigation/31_options_with_interrupts.py` | `python examples/navigation/31_options_with_interrupts.py` | option β / interrupt -> dock_and_charge -> resume go_to_goal | | `navigation/34_human_correction_replanning.py` | `python examples/navigation/34_human_correction_replanning.py` | plan shortcut -> human correction -> cost update -> replan | | `navigation/38_monte_carlo_localization.py` | `python examples/navigation/38_monte_carlo_localization.py` | particle filter -> kidnapped -> inject particles -> recover | +| `navigation/40_potential_field_escape.py` | `python examples/navigation/40_potential_field_escape.py` | gradient descent -> local minimum -> boundary follow -> goal | ## Manipulation diff --git a/examples/navigation/40_potential_field_escape.py b/examples/navigation/40_potential_field_escape.py new file mode 100644 index 0000000..f3e69b9 --- /dev/null +++ b/examples/navigation/40_potential_field_escape.py @@ -0,0 +1,309 @@ +"""Drive an artificial potential field to the goal, get trapped, and escape the minimum. + +Artificial potential fields (Khatib, 1986) are the one-line local planner: pull +the robot down the gradient of an attractive potential toward the goal while +pushing it up a repulsive potential away from obstacles, and step along the net +force. It is elegant and it has one famous failure — a **local minimum**, a spot +where attraction and repulsion cancel but the goal is not reached, so the robot +stalls forever in front of an obstacle. + +This example is that failure and its classic recovery in one loop. An obstacle +sits squarely on the straight line from start to goal, so plain gradient descent +walks the robot into the stagnation point in front of it and stops. The world +notices the lack of progress and reports a ``local_minimum`` failure; the agent +reacts the way real potential-field planners do — it switches to **boundary +following**, sliding tangentially around the obstacle (perpendicular to the +repulsive gradient) until the goal-ward path reopens, then resumes gradient +descent and reaches the goal. + +Run with ``--no-escape`` to disable the recovery and watch plain potential-field +descent stall at the minimum until timeout. + +Success: the robot reaches the goal radius. +Failure: local_minimum (recoverable - net force vanished before the goal; the +trigger for boundary following), collision (terminal), timeout (terminal). + +References: + * O. Khatib, "Real-Time Obstacle Avoidance for Manipulators and Mobile Robots," + ICRA 1985 / IJRR 1986 - the attractive/repulsive potential formulation. + * Y. Koren and J. Borenstein, "Potential Field Methods and Their Inherent + Limitations for Mobile Robot Navigation," ICRA 1991 - the local-minimum trap. +""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from pir.core.random import make_rng +from pir.core.types import Failure, StepResult, Trace + + +@dataclass(frozen=True) +class Obstacle: + center: tuple[float, float] + radius: float + + +@dataclass +class APFConfig: + world_size: float = 10.0 + start: tuple[float, float] = (1.0, 5.0) + goal: tuple[float, float] = (9.0, 5.0) + goal_radius: float = 0.35 + step_size: float = 0.25 + k_att: float = 0.12 + k_rep: float = 1.4 + influence: float = 2.6 # repulsion reaches this far past the obstacle surface + improve_eps: float = 0.05 # net closing of the goal distance that counts as progress + stuck_patience: int = 6 # steps without net progress before local_minimum fires + max_steps: int = 120 + obstacles: tuple[Obstacle, ...] = (Obstacle(center=(5.0, 5.0), radius=1.1),) + + +class PotentialFieldWorld: + """Continuous 2D world; reports a local_minimum failure when progress stalls.""" + + def __init__(self, *, seed: int | None = 0, config: APFConfig | None = None) -> None: + self.config = config or APFConfig() + self.seed = seed + self.reset(seed=seed) + + def reset(self, seed: int | None = None) -> dict[str, Any]: + if seed is not None: + self.seed = seed + self.rng = make_rng(self.seed) + self.pos = np.asarray(self.config.start, dtype=float).copy() + self.goal = np.asarray(self.config.goal, dtype=float) + self.time = 0 + self._stalled = 0 + self._best_dist = float(np.linalg.norm(self.goal - self.pos)) + return self.observe() + + def observe(self) -> dict[str, Any]: + return { + "time": self.time, + "position": self.pos.copy(), + "goal": self.goal.copy(), + "obstacles": [(np.asarray(o.center, dtype=float), o.radius) for o in self.config.obstacles], + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + } + + def step(self, action: dict[str, Any]) -> StepResult: + self.time += 1 + velocity = np.asarray(action.get("velocity", np.zeros(2)), dtype=float) + before = self.pos.copy() + self.pos = np.clip(self.pos + velocity, 0.0, self.config.world_size) + progress = float(np.linalg.norm(self.pos - before)) + + info: dict[str, Any] = { + "time": self.time, + "position": self.pos.copy(), + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + "progress": progress, + "success": False, + } + + for center, radius in [(np.asarray(o.center, dtype=float), o.radius) for o in self.config.obstacles]: + if np.linalg.norm(self.pos - center) < radius: + info["failure"] = Failure("collision", "robot entered an obstacle", False) + return StepResult(self.observe(), -1.0, True, info) + + if info["distance_to_goal"] <= self.config.goal_radius: + info["success"] = True + return StepResult(self.observe(), 1.0, True, info) + + # Stalled away from the goal: the potential-field local minimum. Detected + # by a lack of *net* progress (the robot oscillates in place around the + # stagnation point), not by a single small step. + if info["distance_to_goal"] < self._best_dist - self.config.improve_eps: + self._best_dist = info["distance_to_goal"] + self._stalled = 0 + else: + self._stalled += 1 + if self._stalled >= self.config.stuck_patience: + info["failure"] = Failure( + "local_minimum", "net force vanished before reaching the goal", True + ) + self._stalled = 0 # give the recovery a fresh window before re-firing + + if self.time >= self.config.max_steps: + info["failure"] = Failure("timeout", "ran out of steps", False) + return StepResult(self.observe(), -0.5, True, info) + + return StepResult(self.observe(), -0.01, False, info) + + +class PotentialFieldAgent: + """Gradient descent on the potential; boundary-follows out of a local minimum.""" + + def __init__(self, *, config: APFConfig | None = None, escape: bool = True) -> None: + self.config = config or APFConfig() + self.escape_enabled = escape + self.reset() + + def reset(self) -> None: + self.escape_steps_left = 0 + self.escape_sign = 1.0 + + def act(self, obs: dict[str, Any]) -> dict[str, Any]: + pos = np.asarray(obs["position"], dtype=float) + goal = np.asarray(obs["goal"], dtype=float) + attractive = self.config.k_att * (goal - pos) + repulsive, nearest_gradient = self._repulsive_force(pos, obs["obstacles"]) + net = attractive + repulsive + + if self.escape_steps_left > 0 and nearest_gradient is not None: + # Boundary following: move tangent to the repulsive gradient (around the + # obstacle) with a slight goal-ward bias, instead of down the net force. + tangent = np.array([-nearest_gradient[1], nearest_gradient[0]]) * self.escape_sign + direction = tangent + 0.35 * (goal - pos) / (np.linalg.norm(goal - pos) + 1e-9) + self.escape_steps_left -= 1 + velocity = self._cap(direction) + else: + velocity = self._cap(net) + + self.last_net_norm = float(np.linalg.norm(net)) + return {"velocity": velocity} + + def update(self, obs: dict[str, Any], reward: float, info: dict[str, Any]) -> None: + failure = info.get("failure") + if ( + self.escape_enabled + and isinstance(failure, Failure) + and failure.kind == "local_minimum" + and self.escape_steps_left == 0 + ): + # Enter boundary-following for enough steps to clear the obstacle, and + # commit to the side that points more toward the goal. + pos = np.asarray(info["position"], dtype=float) + goal = np.asarray(obs["goal"], dtype=float) + _, gradient = self._repulsive_force(pos, obs["obstacles"]) + self.escape_steps_left = 14 + if gradient is not None: + tangent = np.array([-gradient[1], gradient[0]]) + self.escape_sign = 1.0 if np.dot(tangent, goal - pos) >= 0 else -1.0 + info["escaping"] = self.escape_steps_left > 0 + info["net_force"] = getattr(self, "last_net_norm", 0.0) + + def _repulsive_force(self, pos: np.ndarray, obstacles: Any) -> tuple[np.ndarray, np.ndarray | None]: + total = np.zeros(2) + nearest_gradient: np.ndarray | None = None + nearest_d = np.inf + for center, radius in obstacles: + center = np.asarray(center, dtype=float) + offset = pos - center + surface_d = float(np.linalg.norm(offset)) - radius + if surface_d < self.config.influence and surface_d > 1e-6: + direction = offset / (np.linalg.norm(offset) + 1e-9) + magnitude = self.config.k_rep * (1.0 / surface_d - 1.0 / self.config.influence) / (surface_d**2) + total += magnitude * direction + if surface_d < nearest_d: + nearest_d = surface_d + nearest_gradient = direction + return total, nearest_gradient + + def _cap(self, vector: np.ndarray) -> np.ndarray: + """Move along the force, but no faster than step_size — so the robot truly + slows to a halt as the net force vanishes at a local minimum.""" + norm = float(np.linalg.norm(vector)) + if norm < 1e-9: + return np.zeros(2) + return vector * (min(norm, self.config.step_size) / norm) + + +def run( + seed: int = 0, + render: bool = True, + max_steps: int | None = None, + escape: bool = True, +) -> Trace: + config = APFConfig() + if max_steps is not None: + config.max_steps = max_steps + world = PotentialFieldWorld(seed=seed, config=config) + agent = PotentialFieldAgent(config=config, escape=escape) + obs = world.reset(seed=seed) + agent.reset() + trace = Trace() + + for _ in range(config.max_steps): + action = agent.act(obs) + result = world.step(action) + obs, reward, done, info = result.as_tuple() + agent.update(obs, reward, info) + trace.append(obs, action, reward, info) + + if render: + _render(world, info) + + if done: + break + + return trace + + +def _render(world: PotentialFieldWorld, info: dict[str, Any]) -> None: + import matplotlib.pyplot as plt + from matplotlib.patches import Circle + + if not hasattr(world, "_fig") or world._fig is None: + plt.ion() + world._fig, world._ax = plt.subplots(figsize=(5, 5)) + ax = world._ax + ax.clear() + ax.set_title("Potential field: descend, trap, escape, reach goal") + ax.set_xlim(0, world.config.world_size) + ax.set_ylim(0, world.config.world_size) + ax.set_aspect("equal", adjustable="box") + for o in world.config.obstacles: + ax.add_patch(Circle(o.center, o.radius, color="0.3", alpha=0.5)) + ax.add_patch(Circle(o.center, o.radius + world.config.influence, color="0.7", alpha=0.12)) + ax.plot(*world.goal, marker="*", color="tab:green", markersize=16) + ax.plot(*world.pos, marker="o", color="tab:red" if info.get("escaping") else "tab:blue", markersize=9) + ax.text(0.02, 0.97, f"d={info['distance_to_goal']:.2f} |F|={info.get('net_force', 0):.3f}" + f"{' ESCAPING' if info.get('escaping') else ''}", transform=ax.transAxes, va="top") + world._fig.canvas.draw_idle() + plt.pause(0.03) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--max-steps", type=int, default=120) + parser.add_argument("--no-render", action="store_true") + parser.add_argument("--no-escape", action="store_true", help="plain APF (stalls at the minimum)") + args = parser.parse_args() + + trace = run( + seed=args.seed, + render=not args.no_render, + max_steps=args.max_steps, + escape=not args.no_escape, + ) + final = trace.infos[-1] + failures = sorted({f.kind for f in trace.failures()}) + print( + f"reached={final.get('success', False)} steps={len(trace.actions)} " + f"final_dist={final.get('distance_to_goal', float('nan')):.2f} " + f"failures={failures} escape={not args.no_escape}" + ) + + if not args.no_render: + import matplotlib.pyplot as plt + + plt.ioff() + plt.show() + + +if __name__ == "__main__": + main() diff --git a/examples/navigation/README.md b/examples/navigation/README.md index 48c81a2..b4d7563 100644 --- a/examples/navigation/README.md +++ b/examples/navigation/README.md @@ -655,3 +655,47 @@ predict -> weight by range+bearing -> w_fast/w_slow drop ? inject : resample -> - Lower `num_particles` and watch augmented recovery get less reliable (fewer injected particles land on the sharp likelihood peak). - Raise `bearing_noise` toward range-only sensing and watch ambiguity return. + +## `40_potential_field_escape.py` + +### What this teaches + +Artificial potential fields (Khatib, 1986) are the one-line local planner: step +down the gradient of an attractive potential toward the goal plus a repulsive +potential away from obstacles. Their famous failure is the **local minimum** — a +spot where attraction and repulsion cancel short of the goal, so the robot +stalls. Here an obstacle sits on the straight line to the goal, so plain descent +walks into the stagnation point and stops; the world reports a `local_minimum` +failure, and the agent recovers the way real planners do — **boundary +following**, sliding tangentially around the obstacle until the goal-ward path +reopens. Run with `--no-escape` to watch plain descent stall until timeout. + +### Run + +```bash +python examples/navigation/40_potential_field_escape.py +python examples/navigation/40_potential_field_escape.py --no-escape # stalls +``` + +### Key loop + +```text +attractive + repulsive force -> step -> no net progress ? boundary-follow : descend -> goal +``` + +### Simplifications + +- continuous 2D point robot, circular obstacles, Khatib repulsion within an + influence radius +- the local minimum is detected by a lack of *net* progress toward the goal + (the robot oscillates in place), not by a single small step +- escape is a fixed-length tangential slide committed to the goal-ward side +- no dynamics or sensing noise, so the trap is a clean textbook stagnation + +### Things to try + +- Toggle `--no-escape` and compare the `local_minimum` count (plain descent fires + it every step; the escape fires it once and recovers). +- Move the obstacle off the start–goal line and watch the trap weaken. +- Add a second obstacle to build a concave pocket and a deeper minimum. +- Raise `k_rep` and watch the stagnation point sit further from the obstacle. diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index 955b85b..be6ee98 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -879,3 +879,34 @@ def test_ungrounded_language_only_violates_affordances_and_times_out() -> None: # Grounding the same scores in affordances turns the plan executable. assert grounded.infos[-1]["success"] is True assert len(grounded.actions) < len(ungrounded.actions) + + +def test_potential_field_escapes_local_minimum() -> None: + module = load_example("examples/navigation/40_potential_field_escape.py") + + trace = module.run(seed=0, render=False, escape=True) + + # The head-on obstacle traps gradient descent; the agent detects it and + # boundary-follows out, then reaches the goal. + assert trace.infos[-1]["success"] is True + assert any(f.kind == "local_minimum" for f in trace.failures()) + assert any(info.get("escaping") for info in trace.infos) + assert not any(f.kind == "collision" for f in trace.failures()) + + +def test_plain_potential_field_stalls_in_the_minimum() -> None: + module = load_example("examples/navigation/40_potential_field_escape.py") + + plain = module.run(seed=0, render=False, escape=False) + escaped = module.run(seed=0, render=False, escape=True) + + # Without the escape, descent stalls at the minimum and never recovers: + # local_minimum fires repeatedly and the run times out short of the goal. + assert plain.infos[-1]["success"] is False + assert sum(1 for f in plain.failures() if f.kind == "local_minimum") >= 3 + assert any(f.kind == "timeout" for f in plain.failures()) + assert not any(info.get("escaping") for info in plain.infos) + + # The escape recovers on the same scene and ends much closer to the goal. + assert escaped.infos[-1]["success"] is True + assert escaped.infos[-1]["distance_to_goal"] < plain.infos[-1]["distance_to_goal"]