From ec677c057baa7ef64965ac630077b7d7d36c1331 Mon Sep 17 00:00:00 2001 From: rsasaki0109 Date: Fri, 5 Jun 2026 11:07:40 +0900 Subject: [PATCH] Add potential-field local-minimum escape (navigation/40) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Artificial potential fields (Khatib, 1986) are the one-line local planner and the repo had no example of their one famous failure: the local minimum. This adds it together with the classic recovery. An obstacle sits on the straight line from start to goal, so plain gradient descent walks the robot into the stagnation point in front of it and stalls. The world detects the lack of *net* progress (the robot oscillates in place around the minimum) and reports a `local_minimum` failure; the agent reacts the way real potential-field planners do — it switches to boundary following, sliding tangent to the repulsive gradient around the obstacle until the goal-ward path reopens, then resumes descent and reaches the goal. `--no-escape` keeps plain descent and it stalls until timeout. escape: local_minimum x1 -> 14 tangential steps -> goal (51 steps) no escape: local_minimum x18 + timeout, stuck 6.0 from the goal - self-contained PotentialFieldWorld (Khatib attractive/repulsive forces within an influence radius, net-progress stall detection) + a PotentialFieldAgent with boundary-following escape, and a References section - two smoke tests: escape detects the minimum, boundary-follows, and reaches the goal with no collision; plain descent stalls, fires local_minimum repeatedly, and times out while the escape reaches the goal on the same scene - examples index + navigation README section; example 42->43, tests 118->120 Verified headless (escape reaches in 51 steps, plain stalls at distance 6.0) and the matplotlib render path under Agg. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 2 +- docs/status.md | 4 +- examples/README.md | 1 + .../navigation/40_potential_field_escape.py | 309 ++++++++++++++++++ examples/navigation/README.md | 44 +++ tests/test_examples_smoke.py | 31 ++ 6 files changed, 388 insertions(+), 3 deletions(-) create mode 100644 examples/navigation/40_potential_field_escape.py diff --git a/README.md b/README.md index b9545d4..306e536 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ star helps others find it. ## Status -42 runnable examples · 38 README GIFs · 118 smoke / regression tests · +43 runnable examples · 38 README GIFs · 120 smoke / regression tests · 5 Gymnasium-style adapters · CI green on Python 3.10, 3.11, and 3.12. See `docs/status.md` for the implementation snapshot, `docs/plan.md` for the diff --git a/docs/status.md b/docs/status.md index feba38c..1eda2bb 100644 --- a/docs/status.md +++ b/docs/status.md @@ -5,10 +5,10 @@ see what exists, what is verified, and what should come next. ## Snapshot -- Runnable examples: 42 +- Runnable examples: 43 - Learning-path roadmap examples: 20 - README GIFs: 38 -- Smoke and regression tests: 118 (105 example/adapter/static + 13 planning) +- Smoke and regression tests: 120 (107 example/adapter/static + 13 planning) - Colab notebooks: 5 - Core dependencies: `numpy`, `matplotlib` - Contributor extra: `pip install -e ".[dev]"` diff --git a/examples/README.md b/examples/README.md index 10620bc..369ed0b 100644 --- a/examples/README.md +++ b/examples/README.md @@ -31,6 +31,7 @@ Run any example headless with its `--no-render` flag when available. | `navigation/31_options_with_interrupts.py` | `python examples/navigation/31_options_with_interrupts.py` | option β / interrupt -> dock_and_charge -> resume go_to_goal | | `navigation/34_human_correction_replanning.py` | `python examples/navigation/34_human_correction_replanning.py` | plan shortcut -> human correction -> cost update -> replan | | `navigation/38_monte_carlo_localization.py` | `python examples/navigation/38_monte_carlo_localization.py` | particle filter -> kidnapped -> inject particles -> recover | +| `navigation/40_potential_field_escape.py` | `python examples/navigation/40_potential_field_escape.py` | gradient descent -> local minimum -> boundary follow -> goal | ## Manipulation diff --git a/examples/navigation/40_potential_field_escape.py b/examples/navigation/40_potential_field_escape.py new file mode 100644 index 0000000..f3e69b9 --- /dev/null +++ b/examples/navigation/40_potential_field_escape.py @@ -0,0 +1,309 @@ +"""Drive an artificial potential field to the goal, get trapped, and escape the minimum. + +Artificial potential fields (Khatib, 1986) are the one-line local planner: pull +the robot down the gradient of an attractive potential toward the goal while +pushing it up a repulsive potential away from obstacles, and step along the net +force. It is elegant and it has one famous failure — a **local minimum**, a spot +where attraction and repulsion cancel but the goal is not reached, so the robot +stalls forever in front of an obstacle. + +This example is that failure and its classic recovery in one loop. An obstacle +sits squarely on the straight line from start to goal, so plain gradient descent +walks the robot into the stagnation point in front of it and stops. The world +notices the lack of progress and reports a ``local_minimum`` failure; the agent +reacts the way real potential-field planners do — it switches to **boundary +following**, sliding tangentially around the obstacle (perpendicular to the +repulsive gradient) until the goal-ward path reopens, then resumes gradient +descent and reaches the goal. + +Run with ``--no-escape`` to disable the recovery and watch plain potential-field +descent stall at the minimum until timeout. + +Success: the robot reaches the goal radius. +Failure: local_minimum (recoverable - net force vanished before the goal; the +trigger for boundary following), collision (terminal), timeout (terminal). + +References: + * O. Khatib, "Real-Time Obstacle Avoidance for Manipulators and Mobile Robots," + ICRA 1985 / IJRR 1986 - the attractive/repulsive potential formulation. + * Y. Koren and J. Borenstein, "Potential Field Methods and Their Inherent + Limitations for Mobile Robot Navigation," ICRA 1991 - the local-minimum trap. +""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from pir.core.random import make_rng +from pir.core.types import Failure, StepResult, Trace + + +@dataclass(frozen=True) +class Obstacle: + center: tuple[float, float] + radius: float + + +@dataclass +class APFConfig: + world_size: float = 10.0 + start: tuple[float, float] = (1.0, 5.0) + goal: tuple[float, float] = (9.0, 5.0) + goal_radius: float = 0.35 + step_size: float = 0.25 + k_att: float = 0.12 + k_rep: float = 1.4 + influence: float = 2.6 # repulsion reaches this far past the obstacle surface + improve_eps: float = 0.05 # net closing of the goal distance that counts as progress + stuck_patience: int = 6 # steps without net progress before local_minimum fires + max_steps: int = 120 + obstacles: tuple[Obstacle, ...] = (Obstacle(center=(5.0, 5.0), radius=1.1),) + + +class PotentialFieldWorld: + """Continuous 2D world; reports a local_minimum failure when progress stalls.""" + + def __init__(self, *, seed: int | None = 0, config: APFConfig | None = None) -> None: + self.config = config or APFConfig() + self.seed = seed + self.reset(seed=seed) + + def reset(self, seed: int | None = None) -> dict[str, Any]: + if seed is not None: + self.seed = seed + self.rng = make_rng(self.seed) + self.pos = np.asarray(self.config.start, dtype=float).copy() + self.goal = np.asarray(self.config.goal, dtype=float) + self.time = 0 + self._stalled = 0 + self._best_dist = float(np.linalg.norm(self.goal - self.pos)) + return self.observe() + + def observe(self) -> dict[str, Any]: + return { + "time": self.time, + "position": self.pos.copy(), + "goal": self.goal.copy(), + "obstacles": [(np.asarray(o.center, dtype=float), o.radius) for o in self.config.obstacles], + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + } + + def step(self, action: dict[str, Any]) -> StepResult: + self.time += 1 + velocity = np.asarray(action.get("velocity", np.zeros(2)), dtype=float) + before = self.pos.copy() + self.pos = np.clip(self.pos + velocity, 0.0, self.config.world_size) + progress = float(np.linalg.norm(self.pos - before)) + + info: dict[str, Any] = { + "time": self.time, + "position": self.pos.copy(), + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + "progress": progress, + "success": False, + } + + for center, radius in [(np.asarray(o.center, dtype=float), o.radius) for o in self.config.obstacles]: + if np.linalg.norm(self.pos - center) < radius: + info["failure"] = Failure("collision", "robot entered an obstacle", False) + return StepResult(self.observe(), -1.0, True, info) + + if info["distance_to_goal"] <= self.config.goal_radius: + info["success"] = True + return StepResult(self.observe(), 1.0, True, info) + + # Stalled away from the goal: the potential-field local minimum. Detected + # by a lack of *net* progress (the robot oscillates in place around the + # stagnation point), not by a single small step. + if info["distance_to_goal"] < self._best_dist - self.config.improve_eps: + self._best_dist = info["distance_to_goal"] + self._stalled = 0 + else: + self._stalled += 1 + if self._stalled >= self.config.stuck_patience: + info["failure"] = Failure( + "local_minimum", "net force vanished before reaching the goal", True + ) + self._stalled = 0 # give the recovery a fresh window before re-firing + + if self.time >= self.config.max_steps: + info["failure"] = Failure("timeout", "ran out of steps", False) + return StepResult(self.observe(), -0.5, True, info) + + return StepResult(self.observe(), -0.01, False, info) + + +class PotentialFieldAgent: + """Gradient descent on the potential; boundary-follows out of a local minimum.""" + + def __init__(self, *, config: APFConfig | None = None, escape: bool = True) -> None: + self.config = config or APFConfig() + self.escape_enabled = escape + self.reset() + + def reset(self) -> None: + self.escape_steps_left = 0 + self.escape_sign = 1.0 + + def act(self, obs: dict[str, Any]) -> dict[str, Any]: + pos = np.asarray(obs["position"], dtype=float) + goal = np.asarray(obs["goal"], dtype=float) + attractive = self.config.k_att * (goal - pos) + repulsive, nearest_gradient = self._repulsive_force(pos, obs["obstacles"]) + net = attractive + repulsive + + if self.escape_steps_left > 0 and nearest_gradient is not None: + # Boundary following: move tangent to the repulsive gradient (around the + # obstacle) with a slight goal-ward bias, instead of down the net force. + tangent = np.array([-nearest_gradient[1], nearest_gradient[0]]) * self.escape_sign + direction = tangent + 0.35 * (goal - pos) / (np.linalg.norm(goal - pos) + 1e-9) + self.escape_steps_left -= 1 + velocity = self._cap(direction) + else: + velocity = self._cap(net) + + self.last_net_norm = float(np.linalg.norm(net)) + return {"velocity": velocity} + + def update(self, obs: dict[str, Any], reward: float, info: dict[str, Any]) -> None: + failure = info.get("failure") + if ( + self.escape_enabled + and isinstance(failure, Failure) + and failure.kind == "local_minimum" + and self.escape_steps_left == 0 + ): + # Enter boundary-following for enough steps to clear the obstacle, and + # commit to the side that points more toward the goal. + pos = np.asarray(info["position"], dtype=float) + goal = np.asarray(obs["goal"], dtype=float) + _, gradient = self._repulsive_force(pos, obs["obstacles"]) + self.escape_steps_left = 14 + if gradient is not None: + tangent = np.array([-gradient[1], gradient[0]]) + self.escape_sign = 1.0 if np.dot(tangent, goal - pos) >= 0 else -1.0 + info["escaping"] = self.escape_steps_left > 0 + info["net_force"] = getattr(self, "last_net_norm", 0.0) + + def _repulsive_force(self, pos: np.ndarray, obstacles: Any) -> tuple[np.ndarray, np.ndarray | None]: + total = np.zeros(2) + nearest_gradient: np.ndarray | None = None + nearest_d = np.inf + for center, radius in obstacles: + center = np.asarray(center, dtype=float) + offset = pos - center + surface_d = float(np.linalg.norm(offset)) - radius + if surface_d < self.config.influence and surface_d > 1e-6: + direction = offset / (np.linalg.norm(offset) + 1e-9) + magnitude = self.config.k_rep * (1.0 / surface_d - 1.0 / self.config.influence) / (surface_d**2) + total += magnitude * direction + if surface_d < nearest_d: + nearest_d = surface_d + nearest_gradient = direction + return total, nearest_gradient + + def _cap(self, vector: np.ndarray) -> np.ndarray: + """Move along the force, but no faster than step_size — so the robot truly + slows to a halt as the net force vanishes at a local minimum.""" + norm = float(np.linalg.norm(vector)) + if norm < 1e-9: + return np.zeros(2) + return vector * (min(norm, self.config.step_size) / norm) + + +def run( + seed: int = 0, + render: bool = True, + max_steps: int | None = None, + escape: bool = True, +) -> Trace: + config = APFConfig() + if max_steps is not None: + config.max_steps = max_steps + world = PotentialFieldWorld(seed=seed, config=config) + agent = PotentialFieldAgent(config=config, escape=escape) + obs = world.reset(seed=seed) + agent.reset() + trace = Trace() + + for _ in range(config.max_steps): + action = agent.act(obs) + result = world.step(action) + obs, reward, done, info = result.as_tuple() + agent.update(obs, reward, info) + trace.append(obs, action, reward, info) + + if render: + _render(world, info) + + if done: + break + + return trace + + +def _render(world: PotentialFieldWorld, info: dict[str, Any]) -> None: + import matplotlib.pyplot as plt + from matplotlib.patches import Circle + + if not hasattr(world, "_fig") or world._fig is None: + plt.ion() + world._fig, world._ax = plt.subplots(figsize=(5, 5)) + ax = world._ax + ax.clear() + ax.set_title("Potential field: descend, trap, escape, reach goal") + ax.set_xlim(0, world.config.world_size) + ax.set_ylim(0, world.config.world_size) + ax.set_aspect("equal", adjustable="box") + for o in world.config.obstacles: + ax.add_patch(Circle(o.center, o.radius, color="0.3", alpha=0.5)) + ax.add_patch(Circle(o.center, o.radius + world.config.influence, color="0.7", alpha=0.12)) + ax.plot(*world.goal, marker="*", color="tab:green", markersize=16) + ax.plot(*world.pos, marker="o", color="tab:red" if info.get("escaping") else "tab:blue", markersize=9) + ax.text(0.02, 0.97, f"d={info['distance_to_goal']:.2f} |F|={info.get('net_force', 0):.3f}" + f"{' ESCAPING' if info.get('escaping') else ''}", transform=ax.transAxes, va="top") + world._fig.canvas.draw_idle() + plt.pause(0.03) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--max-steps", type=int, default=120) + parser.add_argument("--no-render", action="store_true") + parser.add_argument("--no-escape", action="store_true", help="plain APF (stalls at the minimum)") + args = parser.parse_args() + + trace = run( + seed=args.seed, + render=not args.no_render, + max_steps=args.max_steps, + escape=not args.no_escape, + ) + final = trace.infos[-1] + failures = sorted({f.kind for f in trace.failures()}) + print( + f"reached={final.get('success', False)} steps={len(trace.actions)} " + f"final_dist={final.get('distance_to_goal', float('nan')):.2f} " + f"failures={failures} escape={not args.no_escape}" + ) + + if not args.no_render: + import matplotlib.pyplot as plt + + plt.ioff() + plt.show() + + +if __name__ == "__main__": + main() diff --git a/examples/navigation/README.md b/examples/navigation/README.md index 48c81a2..b4d7563 100644 --- a/examples/navigation/README.md +++ b/examples/navigation/README.md @@ -655,3 +655,47 @@ predict -> weight by range+bearing -> w_fast/w_slow drop ? inject : resample -> - Lower `num_particles` and watch augmented recovery get less reliable (fewer injected particles land on the sharp likelihood peak). - Raise `bearing_noise` toward range-only sensing and watch ambiguity return. + +## `40_potential_field_escape.py` + +### What this teaches + +Artificial potential fields (Khatib, 1986) are the one-line local planner: step +down the gradient of an attractive potential toward the goal plus a repulsive +potential away from obstacles. Their famous failure is the **local minimum** — a +spot where attraction and repulsion cancel short of the goal, so the robot +stalls. Here an obstacle sits on the straight line to the goal, so plain descent +walks into the stagnation point and stops; the world reports a `local_minimum` +failure, and the agent recovers the way real planners do — **boundary +following**, sliding tangentially around the obstacle until the goal-ward path +reopens. Run with `--no-escape` to watch plain descent stall until timeout. + +### Run + +```bash +python examples/navigation/40_potential_field_escape.py +python examples/navigation/40_potential_field_escape.py --no-escape # stalls +``` + +### Key loop + +```text +attractive + repulsive force -> step -> no net progress ? boundary-follow : descend -> goal +``` + +### Simplifications + +- continuous 2D point robot, circular obstacles, Khatib repulsion within an + influence radius +- the local minimum is detected by a lack of *net* progress toward the goal + (the robot oscillates in place), not by a single small step +- escape is a fixed-length tangential slide committed to the goal-ward side +- no dynamics or sensing noise, so the trap is a clean textbook stagnation + +### Things to try + +- Toggle `--no-escape` and compare the `local_minimum` count (plain descent fires + it every step; the escape fires it once and recovers). +- Move the obstacle off the start–goal line and watch the trap weaken. +- Add a second obstacle to build a concave pocket and a deeper minimum. +- Raise `k_rep` and watch the stagnation point sit further from the obstacle. diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index 955b85b..be6ee98 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -879,3 +879,34 @@ def test_ungrounded_language_only_violates_affordances_and_times_out() -> None: # Grounding the same scores in affordances turns the plan executable. assert grounded.infos[-1]["success"] is True assert len(grounded.actions) < len(ungrounded.actions) + + +def test_potential_field_escapes_local_minimum() -> None: + module = load_example("examples/navigation/40_potential_field_escape.py") + + trace = module.run(seed=0, render=False, escape=True) + + # The head-on obstacle traps gradient descent; the agent detects it and + # boundary-follows out, then reaches the goal. + assert trace.infos[-1]["success"] is True + assert any(f.kind == "local_minimum" for f in trace.failures()) + assert any(info.get("escaping") for info in trace.infos) + assert not any(f.kind == "collision" for f in trace.failures()) + + +def test_plain_potential_field_stalls_in_the_minimum() -> None: + module = load_example("examples/navigation/40_potential_field_escape.py") + + plain = module.run(seed=0, render=False, escape=False) + escaped = module.run(seed=0, render=False, escape=True) + + # Without the escape, descent stalls at the minimum and never recovers: + # local_minimum fires repeatedly and the run times out short of the goal. + assert plain.infos[-1]["success"] is False + assert sum(1 for f in plain.failures() if f.kind == "local_minimum") >= 3 + assert any(f.kind == "timeout" for f in plain.failures()) + assert not any(info.get("escaping") for info in plain.infos) + + # The escape recovers on the same scene and ends much closer to the goal. + assert escaped.infos[-1]["success"] is True + assert escaped.infos[-1]["distance_to_goal"] < plain.infos[-1]["distance_to_goal"]