diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..167cb44 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,62 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run tests + run: pytest tests/ -v + + build: + runs-on: ubuntu-latest + needs: test + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install build tools + run: python -m pip install --upgrade pip build + + - name: Build distributions + run: python -m build + + - name: Verify wheel installs cleanly + run: | + pip install dist/*.whl + python -c "from engine import JumpDiffusionEngine; print('import OK')" + + - name: Upload distributions + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6d793b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Build / distribution artifacts +*.egg-info/ +build/ +dist/ + +# Python cache +__pycache__/ +*.py[cod] +*.pyo + +# Test / coverage artifacts +.pytest_cache/ +.coverage +htmlcov/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Editor / OS +.DS_Store +*.swp diff --git a/CITATION.cff b/CITATION.cff index 1087cf2..34c2db8 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,6 +1,7 @@ cff-version: 1.2.0 message: "If you use this software, please cite it as below." title: "Jump Diffusion Engine" +version: "0.1.0" abstract: "A universal framework for multistable stochastic control." type: software authors: diff --git a/README.md b/README.md index 326a53f..0dbb632 100644 --- a/README.md +++ b/README.md @@ -1,47 +1,88 @@ -# Jump-Diffusion-Engine – Perfectly Balanced Stochastic Control -**One set of inputs. Continuous, self-regulating outputs.** +# Jump-Diffusion-Engine – Stochastic Stability Analysis +**Simulate, analyse, and steer noisy nonlinear systems.** -A universal stability port for any dynamic system with a Source (Λ(t)), a Medium (Δ), and a Sink (f(Δ)). +A simulation framework for systems with a Source (Λ(t)), a Medium (Δ), and a nonlinear Sink (f(Δ)) +subject to continuous diffusion and discrete jump noise. ## Core Equation `dΔ = [Λ(t) − f(Δ)] dt + σ dW + J dN` -- `Λ(t) = ε₀ + A·sin(ωt)` — the source, steady and singing -- `f(Δ) = kΔ + gΔ²/(K²+Δ²)` — the sink, linear then saturating -- `Δ* : Λ = f(Δ), f′(Δ*) > 0` — the bowl, the stable held place +- `Λ(t) = ε₀ + A·sin(ωt)` — the source (constant or time-varying) +- `f(Δ) = kΔ + gΔ²/(K²+Δ²)` — the nonlinear sink (linear + saturating) +- `Δ* : Λ = f(Δ), f′(Δ*) > 0` — a stable equilibrium (basin centre) -Use `engine.py` to **force** any stochastic system into its stable basin, then **verify** its resilience: +Use `engine.py` to **analyse** stochastic systems and **steer** trajectories toward stable basins: | # | Action | Method | What it does | |:-|:---|:---|:---| -| 1 | **FORCE** it into the bowl | `seat_and_release()` | Applies a transient push, then **releases control to zero**. The basin holds it forever. | -| 2 | **BREATHE** with the noise | `adaptive_k()` | Dynamically stiffens when calm, relaxes when volatile. Prevents numerical blow-up while maximizing rejection. | -| 3 | **MAP** where the bowls are | `find_fixed_points()` | Finds all stable (`f′>0`) and unstable equilibria before you force it. | -| 4 | **MEASURE** how deep the bowl is | `basin_depth()` | Quantifies the energy barrier—how hard you'd have to push to knock it out. | -| 5 | **PREDICT** escape risk | `escape_probability()` | Empirical escape rate over Monte Carlo trials. Should be near-zero after seating. | -| 6 | **VISUALIZE** the long-term cloud | `stationary_density()` | The normalized PDF \( p(\Delta) \propto e^{-2V/\sigma^2} \)—where it lives once forced. | +| 1 | **Seat** into a stable basin | `seat_and_release()` | Applies a transient corrective push, then releases control. Basin strength determines how well it holds. | +| 2 | **Adapt** to volatility | `adaptive_k()` | Updates the reversion coefficient based on recent variance. | +| 3 | **Map** equilibria | `find_fixed_points()` | Finds stable (`f′>0`) and unstable fixed points for a given Λ. | +| 4 | **Measure** basin depth | `basin_depth()` | Quantifies the potential-energy barrier around each basin. | +| 5 | **Estimate** escape risk | `escape_probability()` | Empirical escape rate via Monte Carlo trials. | +| 6 | **Visualise** the stationary distribution | `stationary_density()` | Computes the Boltzmann-weighted PDF p(Δ) ∝ e^{−2V/σ²}. | ## Installation -This project requires Python 3 and the libraries used by `engine.py`: +**Standard install (recommended)** -- `numpy` -- `scipy` -- `matplotlib` +```bash +pip install -e . +``` + +This installs the package and all dependencies from the root of the repository. -Install them with pip: +**Manual dependency install** + +If you prefer not to use the package install, install dependencies directly: ```bash pip install numpy scipy matplotlib ``` -Then import the engine in your Python code: +Then add the repository root to your Python path and import: ```python from engine import JumpDiffusionEngine ``` +> **Note:** A legacy `packaging/pyproject.toml` is also present in the repository. +> For standard `pip install -e .` use the root `pyproject.toml`; the `packaging/` +> directory is kept for historical reference and may be removed in a future release. + +## Quick Start + +```python +import numpy as np +from engine import JumpDiffusionEngine + +# Define a source: constant with a small oscillation +def lambda_func(t): + return 0.5 + 0.1 * np.sin(2 * np.pi * 0.1 * t) + +eng = JumpDiffusionEngine(lambda_func, sigma=0.3, jump_rate=0.05, seed=42) + +# 1. Find stable equilibria +fps = eng.find_fixed_points(lambda_val=0.5) +print("Fixed points:", fps) + +# 2. Simulate a few trajectories +results = eng.simulate(t_max=20.0, x0=0.0, n_realizations=3) +eng.plot_trajectories(results) + +# 3. Steer into a stable basin, then release control +result = eng.seat_and_release(t_max=15.0, x0=3.0, lambda_val=0.5) +print(f"Released at step {result['release_idx']}, seated at Δ* ≈ {result['x_star']:.3f}") + +# 4. Estimate escape probability from the basin +p_escape = eng.escape_probability(threshold=2.0, t_max=10.0, n_trials=200) +print(f"Empirical escape probability: {p_escape:.3f}") + +# 5. Stationary density +x, p = eng.stationary_density(lambda_val=0.5) +``` + ## Use Cases Use this on any system that needs to maintain a steady state while being bombarded by both constant noise and sudden, unpredictable shocks. diff --git a/__pycache__/engine.cpython-312.pyc b/__pycache__/engine.cpython-312.pyc new file mode 100644 index 0000000..217d16f Binary files /dev/null and b/__pycache__/engine.cpython-312.pyc differ diff --git a/engine.py b/engine.py index 5814fd5..c17a850 100644 --- a/engine.py +++ b/engine.py @@ -469,7 +469,10 @@ def seat_and_release(self, t_max: float, x0: float = 0.0, def stationary_density(self, lambda_val: float, x_range: Tuple[float, float] = (-10, 10), n_points: int = 2000): x = np.linspace(x_range[0], x_range[1], n_points) V = self.potential(x, lambda_val) - p = np.exp(-2 * V / (self.sigma ** 2 + 1e-12)) + # Shift V by its minimum before exponentiation to avoid numerical overflow. + # The constant shift cancels exactly in the normalisation. + V_shifted = V - np.min(V) + p = np.exp(-2 * V_shifted / (self.sigma ** 2 + 1e-12)) norm = np.trapezoid(p, x) if hasattr(np, 'trapezoid') else np.trapz(p, x) p /= (norm + 1e-12) return x, p @@ -567,10 +570,47 @@ def _plot_sweep(self, results: List[Dict], param_name: str, metrics: List[str]): plt.tight_layout() plt.show() - # Plotting helpers (plot_trajectories, plot_potential, basin_analysis) unchanged def plot_trajectories(self, results: List[Dict], title: str = "Jump-Diffusion Trajectories", show_energy: bool = False): - # ... (standard implementation as before) - pass # Replace with full from previous if needed + """Plot simulated trajectories from simulate(). + + Parameters + ---------- + results : list of dicts returned by simulate() + title : figure title + show_energy : if True and energy was recorded, add a second panel + """ + has_energy = show_energy and any('energy' in r for r in results) + n_panels = 2 if has_energy else 1 + fig, axes = plt.subplots(n_panels, 1, figsize=(10, 4 * n_panels), squeeze=False) + ax_traj = axes[0, 0] + + for i, r in enumerate(results): + label = f"Run {i + 1}" if len(results) > 1 else None + ax_traj.plot(r['t'], r['x'], lw=0.8, alpha=0.7, label=label) + + ax_traj.set_xlabel("Time") + ax_traj.set_ylabel("Δ") + ax_traj.set_title(title) + ax_traj.grid(True, alpha=0.3) + if len(results) > 1: + ax_traj.legend(fontsize=8) + + if has_energy: + ax_en = axes[1, 0] + for i, r in enumerate(results): + if 'energy' in r: + label = f"Run {i + 1}" if len(results) > 1 else None + ax_en.plot(r['t'], r['energy'], lw=0.8, alpha=0.7, label=label) + ax_en.set_xlabel("Time") + ax_en.set_ylabel("Energy V(Δ)") + ax_en.set_title("Potential Energy") + ax_en.grid(True, alpha=0.3) + if len(results) > 1: + ax_en.legend(fontsize=8) + + plt.tight_layout() + plt.show() + return fig # Quick test / usage if __name__ == "__main__": diff --git a/jump_diffusion_engine.egg-info/PKG-INFO b/jump_diffusion_engine.egg-info/PKG-INFO new file mode 100644 index 0000000..b845ecf --- /dev/null +++ b/jump_diffusion_engine.egg-info/PKG-INFO @@ -0,0 +1,197 @@ +Metadata-Version: 2.4 +Name: jump-diffusion-engine +Version: 0.1.0 +Summary: Jump diffusion simulation engine for stochastic control systems. +Author: Sarah Marin +License-Expression: Apache-2.0 +Requires-Python: >=3.9 +Description-Content-Type: text/markdown +License-File: LICENSE +Requires-Dist: numpy<3.0,>=1.23 +Requires-Dist: scipy<2.0,>=1.10 +Requires-Dist: matplotlib<4.0,>=3.7 +Provides-Extra: dev +Requires-Dist: pytest>=7.0; extra == "dev" +Dynamic: license-file + +# Jump-Diffusion-Engine – Stochastic Stability Analysis +**Simulate, analyse, and steer noisy nonlinear systems.** + +A simulation framework for systems with a Source (Λ(t)), a Medium (Δ), and a nonlinear Sink (f(Δ)) +subject to continuous diffusion and discrete jump noise. + +## Core Equation + +`dΔ = [Λ(t) − f(Δ)] dt + σ dW + J dN` + +- `Λ(t) = ε₀ + A·sin(ωt)` — the source (constant or time-varying) +- `f(Δ) = kΔ + gΔ²/(K²+Δ²)` — the nonlinear sink (linear + saturating) +- `Δ* : Λ = f(Δ), f′(Δ*) > 0` — a stable equilibrium (basin centre) + +Use `engine.py` to **analyse** stochastic systems and **steer** trajectories toward stable basins: + +| # | Action | Method | What it does | +|:-|:---|:---|:---| +| 1 | **Seat** into a stable basin | `seat_and_release()` | Applies a transient corrective push, then releases control. Basin strength determines how well it holds. | +| 2 | **Adapt** to volatility | `adaptive_k()` | Updates the reversion coefficient based on recent variance. | +| 3 | **Map** equilibria | `find_fixed_points()` | Finds stable (`f′>0`) and unstable fixed points for a given Λ. | +| 4 | **Measure** basin depth | `basin_depth()` | Quantifies the potential-energy barrier around each basin. | +| 5 | **Estimate** escape risk | `escape_probability()` | Empirical escape rate via Monte Carlo trials. | +| 6 | **Visualise** the stationary distribution | `stationary_density()` | Computes the Boltzmann-weighted PDF p(Δ) ∝ e^{−2V/σ²}. | + +## Installation + +**Standard install (recommended)** + +```bash +pip install -e . +``` + +This installs the package and all dependencies from the root of the repository. + +**Manual dependency install** + +If you prefer not to use the package install, install dependencies directly: + +```bash +pip install numpy scipy matplotlib +``` + +Then add the repository root to your Python path and import: + +```python +from engine import JumpDiffusionEngine +``` + +> **Note:** A legacy `packaging/pyproject.toml` is also present in the repository. +> For standard `pip install -e .` use the root `pyproject.toml`; the `packaging/` +> directory is kept for historical reference and may be removed in a future release. + +## Quick Start + +```python +import numpy as np +from engine import JumpDiffusionEngine + +# Define a source: constant with a small oscillation +def lambda_func(t): + return 0.5 + 0.1 * np.sin(2 * np.pi * 0.1 * t) + +eng = JumpDiffusionEngine(lambda_func, sigma=0.3, jump_rate=0.05, seed=42) + +# 1. Find stable equilibria +fps = eng.find_fixed_points(lambda_val=0.5) +print("Fixed points:", fps) + +# 2. Simulate a few trajectories +results = eng.simulate(t_max=20.0, x0=0.0, n_realizations=3) +eng.plot_trajectories(results) + +# 3. Steer into a stable basin, then release control +result = eng.seat_and_release(t_max=15.0, x0=3.0, lambda_val=0.5) +print(f"Released at step {result['release_idx']}, seated at Δ* ≈ {result['x_star']:.3f}") + +# 4. Estimate escape probability from the basin +p_escape = eng.escape_probability(threshold=2.0, t_max=10.0, n_trials=200) +print(f"Empirical escape probability: {p_escape:.3f}") + +# 5. Stationary density +x, p = eng.stationary_density(lambda_val=0.5) +``` + +## Use Cases + +Use this on any system that needs to maintain a steady state while being bombarded by both constant noise and sudden, unpredictable shocks. + +--- + +### 1. Renewable Energy Grid Management + +Power grids are dissipative systems because electricity is used up as soon as it is made. + +- **The Source (`Λ`):** The fluctuating energy from wind turbines or solar panels. +- **The Sink (`f`):** The battery storage and consumer demand that drains the energy. +- **The Jumps (`J`):** Sudden lightning strikes or a power plant going offline. +- **Use Case:** Using your code to ensure the grid frequency stays in the bowl (for example, `60 Hz`) and does not crash during a sudden surge. + +--- + +### 2. Biological Homeostasis (Synthetic Biology) + +In genetic engineering, scientists create circuits inside cells to produce insulin or other chemicals. + +- **The Source (`Λ`):** The nutrients fed to the cell. +- **The Sink (`f`):** The metabolic rate at which the cell uses those nutrients. +- **The Jumps (`J`):** Sudden changes in temperature or pH levels. +- **Use Case:** Programming a cell to keep its internal chemical levels stable even if the environment becomes noisy or shocked. + +--- + +### 3. Automated Financial Trading (Hedge Funds) + +Markets are the definition of jump-diffusion. + +- **The Source (`Λ`):** The underlying growth or drift of an asset. +- **The Sink (`f`):** The mean-reverting force (investors selling when the price is too high). +- **The Jumps (`J`):** A flash crash or a sudden news event. +- **Use Case:** A trading bot that identifies the bowl (the fair value) and does not panic-sell during a Poisson shock, knowing the sink force will pull the price back. + +--- + +### 4. Spacecraft Orientation & Satellite Control + +Satellites must point their antennas precisely at Earth while floating in noisy space. + +- **The Source (`Λ`):** The thrusters or reaction wheels. +- **The Sink (`f`):** The friction of the gyroscopes or the magnetic pull of Earth. +- **The Jumps (`J`):** Tiny meteoroid impacts or solar flares. +- **Use Case:** An autopilot system that keeps the satellite centered in its orientation bowl regardless of constant vibrations or sudden bumps. + +--- + +### 5. Supply Chain & Inventory Control + +A warehouse needs to keep enough stock in the bowl without overflowing or running out. + +- **The Source (`Λ`):** The incoming shipments of goods. +- **The Sink (`f`):** The customer orders draining the inventory. +- **The Jumps (`J`):** A sudden viral trend causing a massive spike in orders. +- **Use Case:** An AI manager that uses your restoring-force logic to automatically adjust order rates so the warehouse does not stay empty after a shock. + +--- + +### 6. Climate Modeling (Resilience Thresholds) + +Ecologists use these models to see whether a forest or ocean can survive climate change. + +- **The Source (`Λ`):** Rainfall and sunlight. +- **The Sink (`f`):** Evaporation and consumption by animals. +- **The Jumps (`J`):** Forest fires or extreme heatwaves. +- **Use Case:** Determining the tipping point—how big a jump (`J`) it takes to push the system out of its bowl so that it can never recover. + +--- + +### 7. AI Training & Safe Learning + +When training a robot to walk, you do not want it to explore so far that it breaks itself. + +- **The Source (`Λ`):** The robot's drive to move forward. +- **The Sink (`f`):** A penalty in the code that gets stronger as the robot gets closer to falling. +- **The Jumps (`J`):** A person pushing the robot or an uneven floor. +- **Use Case:** Ensuring the robot's brain always stays within a safe operational basin. + +```bibtex +@software{SarahMarin_JumpDiffusionEngine_2026, + author = {Sarah Marin}, + title = {JumpDiffusionEngine: A Universal Framework for Multistable Stochastic Control}, + year = {2026}, + publisher = {GitHub}, + url = {https://github.com/beanapologist/Jump-Diffusion-Engine}, + note = {Versioned releases archived on Zenodo}, + doi = {10.5281/zenodo.XXXXXXX} +} +``` + +## License + +This project is licensed under the **Apache License 2.0**. See the `LICENSE` file for details. diff --git a/jump_diffusion_engine.egg-info/SOURCES.txt b/jump_diffusion_engine.egg-info/SOURCES.txt new file mode 100644 index 0000000..88e3137 --- /dev/null +++ b/jump_diffusion_engine.egg-info/SOURCES.txt @@ -0,0 +1,10 @@ +LICENSE +README.md +engine.py +pyproject.toml +jump_diffusion_engine.egg-info/PKG-INFO +jump_diffusion_engine.egg-info/SOURCES.txt +jump_diffusion_engine.egg-info/dependency_links.txt +jump_diffusion_engine.egg-info/requires.txt +jump_diffusion_engine.egg-info/top_level.txt +tests/test_engine.py \ No newline at end of file diff --git a/jump_diffusion_engine.egg-info/dependency_links.txt b/jump_diffusion_engine.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/jump_diffusion_engine.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/jump_diffusion_engine.egg-info/requires.txt b/jump_diffusion_engine.egg-info/requires.txt new file mode 100644 index 0000000..12dbac7 --- /dev/null +++ b/jump_diffusion_engine.egg-info/requires.txt @@ -0,0 +1,6 @@ +numpy<3.0,>=1.23 +scipy<2.0,>=1.10 +matplotlib<4.0,>=3.7 + +[dev] +pytest>=7.0 diff --git a/jump_diffusion_engine.egg-info/top_level.txt b/jump_diffusion_engine.egg-info/top_level.txt new file mode 100644 index 0000000..45a160d --- /dev/null +++ b/jump_diffusion_engine.egg-info/top_level.txt @@ -0,0 +1 @@ +engine diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8521c24 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "jump-diffusion-engine" +version = "0.1.0" +description = "Jump diffusion simulation engine for stochastic control systems." +readme = "README.md" +requires-python = ">=3.9" +license = "Apache-2.0" +authors = [{ name = "Sarah Marin" }] +dependencies = [ + "numpy>=1.23,<3.0", + "scipy>=1.10,<2.0", + "matplotlib>=3.7,<4.0", +] + +[project.optional-dependencies] +dev = ["pytest>=7.0"] + +[tool.setuptools] +py-modules = ["engine"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/release/CHANGELOG.md b/release/CHANGELOG.md index db90dc6..a2bfb77 100644 --- a/release/CHANGELOG.md +++ b/release/CHANGELOG.md @@ -8,9 +8,25 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht ## [Unreleased] ### Added -- Initial release scaffolding files in `release/`. +- Root-level `pyproject.toml` for standard `pip install -e .` workflow. +- `tests/test_engine.py` — automated test suite covering `find_fixed_points`, + `escape_probability`, `stationary_density`, `seat_and_release`, `simulate`, + and `plot_trajectories`. +- `.github/workflows/ci.yml` — GitHub Actions CI running tests on Python 3.9–3.12. + +### Fixed +- Implemented `plot_trajectories()`, which was previously a stub (`pass`). + +### Changed +- README revised to use more precise, technically defensible language. +- `CITATION.cff` now includes the `version` field (`0.1.0`). ## [0.1.0] - TBD ### Added - Initial public release of Jump-Diffusion-Engine. +- Core SDE simulator (`simulate`), fixed-point analysis (`find_fixed_points`, + `basin_depth`, `potential`), escape probability estimation (`escape_probability`), + stationary density (`stationary_density`), and basin-seating control + (`seat_and_release`, `identify_boundary`). +- Packaging files under `packaging/` and root `pyproject.toml`. diff --git a/tests/__pycache__/test_engine.cpython-312-pytest-9.1.0.pyc b/tests/__pycache__/test_engine.cpython-312-pytest-9.1.0.pyc new file mode 100644 index 0000000..8f48966 Binary files /dev/null and b/tests/__pycache__/test_engine.cpython-312-pytest-9.1.0.pyc differ diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..9084a5d --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,253 @@ +"""Tests for the JumpDiffusionEngine public API. + +All stochastic tests use a fixed seed (42) or rely on statistical invariants +that hold with very high probability, to avoid brittle flakiness. +""" +import numpy as np +import pytest +import sys +import os + +# Allow importing engine from the repo root when running with pytest from any cwd +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from engine import JumpDiffusionEngine + + +# --------------------------------------------------------------------------- +# Shared fixture +# --------------------------------------------------------------------------- + +@pytest.fixture +def engine(): + """Standard engine with a constant lambda and deterministic seed.""" + return JumpDiffusionEngine( + lambda_func=lambda t: 0.5, + sigma=0.3, + jump_rate=0.0, # no jumps — keeps tests deterministic + dt=0.01, + seed=42, + k=0.8, + g=0.5, + K=2.0, + ) + + +# --------------------------------------------------------------------------- +# find_fixed_points +# --------------------------------------------------------------------------- + +class TestFindFixedPoints: + def test_returns_at_least_one_stable_point(self, engine): + fps = engine.find_fixed_points(lambda_val=0.5) + assert len(fps) > 0, "Expected at least one fixed point" + + def test_stable_points_have_positive_f_prime(self, engine): + fps = engine.find_fixed_points(lambda_val=0.5) + stable = [fp for fp in fps if fp['stable']] + assert len(stable) > 0, "Expected at least one stable fixed point" + for fp in stable: + assert fp['f_prime'] > 0, "Stable fixed point must have f' > 0" + + def test_fixed_point_satisfies_f_equals_lambda(self, engine): + lambda_val = 0.5 + fps = engine.find_fixed_points(lambda_val=lambda_val) + for fp in fps: + residual = abs(engine.f_func(fp['x_star']) - lambda_val) + assert residual < 1e-4, f"Fixed point residual too large: {residual}" + + def test_result_structure(self, engine): + fps = engine.find_fixed_points(lambda_val=0.5) + for fp in fps: + assert 'x_star' in fp + assert 'stable' in fp + assert 'f_prime' in fp + assert 'lambda_val' in fp + + def test_no_fixed_points_returns_empty(self, engine): + # With a very large lambda_val that exceeds the sink's maximum, + # there should be no fixed points. + fps = engine.find_fixed_points(lambda_val=1e6) + assert fps == [] + + def test_different_lambda_shifts_fixed_point(self, engine): + fps_low = engine.find_fixed_points(lambda_val=0.1) + fps_high = engine.find_fixed_points(lambda_val=0.9) + x_low = sorted(fp['x_star'] for fp in fps_low if fp['stable']) + x_high = sorted(fp['x_star'] for fp in fps_high if fp['stable']) + # Higher lambda → fixed point shifts to higher x + if x_low and x_high: + assert x_high[0] > x_low[0] + + +# --------------------------------------------------------------------------- +# escape_probability +# --------------------------------------------------------------------------- + +class TestEscapeProbability: + def test_returns_value_in_unit_interval(self, engine): + p = engine.escape_probability(threshold=5.0, t_max=10.0, n_trials=20) + assert 0.0 <= p <= 1.0 + + def test_large_threshold_gives_low_escape(self, engine): + # With a very large threshold, very few (if any) trials should escape. + p = engine.escape_probability(threshold=50.0, t_max=5.0, n_trials=30) + assert p < 0.5, f"Expected low escape probability with large threshold, got {p}" + + def test_tiny_threshold_gives_high_escape(self, engine): + # With a near-zero threshold, almost every trial should "escape". + p = engine.escape_probability(threshold=1e-6, t_max=5.0, n_trials=30) + assert p > 0.5, f"Expected high escape probability with tiny threshold, got {p}" + + def test_explicit_x0_and_x_star(self, engine): + fps = engine.find_fixed_points(0.5) + x_star = fps[0]['x_star'] if fps else 0.0 + p = engine.escape_probability( + threshold=5.0, t_max=5.0, x0=x_star, x_star=x_star, n_trials=20 + ) + assert 0.0 <= p <= 1.0 + + +# --------------------------------------------------------------------------- +# stationary_density +# --------------------------------------------------------------------------- + +class TestStationaryDensity: + def test_returns_arrays_of_matching_length(self, engine): + x, p = engine.stationary_density(lambda_val=0.5) + assert len(x) == len(p) + assert len(x) > 0 + + def test_density_is_normalised(self, engine): + x, p = engine.stationary_density(lambda_val=0.5) + # Numerical integral should be approximately 1. + integral = (np.trapezoid(p, x) if hasattr(np, 'trapezoid') else + np.trapz(p, x)) + assert abs(integral - 1.0) < 0.05, f"Density not normalised: integral={integral}" + + def test_density_is_non_negative(self, engine): + x, p = engine.stationary_density(lambda_val=0.5) + assert np.all(p >= 0), "Density contains negative values" + + def test_peak_near_stable_fixed_point(self, engine): + lambda_val = 0.5 + fps = engine.find_fixed_points(lambda_val) + stable = [fp for fp in fps if fp['stable']] + if not stable: + pytest.skip("No stable fixed point to compare against") + x_star = stable[0]['x_star'] + x, p = engine.stationary_density(lambda_val=lambda_val) + peak_x = x[np.argmax(p)] + assert abs(peak_x - x_star) < 1.5, ( + f"Density peak ({peak_x:.3f}) far from stable equilibrium ({x_star:.3f})" + ) + + +# --------------------------------------------------------------------------- +# seat_and_release +# --------------------------------------------------------------------------- + +class TestSeatAndRelease: + def test_returns_expected_keys(self, engine): + result = engine.seat_and_release(t_max=5.0, x0=3.0, lambda_val=0.5) + for key in ('t', 'x', 'control', 'boundary', 'x_star', 'settle_tol', + 'release_idx', 'released'): + assert key in result, f"Missing key: {key}" + + def test_trajectory_length_matches_time(self, engine): + t_max = 5.0 + result = engine.seat_and_release(t_max=t_max, x0=3.0, lambda_val=0.5) + expected_steps = int(t_max / engine.dt) + 1 + assert len(result['t']) == expected_steps + assert len(result['x']) == expected_steps + + def test_control_is_zero_after_release(self, engine): + result = engine.seat_and_release(t_max=5.0, x0=3.0, lambda_val=0.5) + if result['released'] and result['release_idx'] is not None: + post_control = result['control'][result['release_idx']:] + assert np.allclose(post_control, 0.0), "Control was non-zero after release" + + def test_raises_without_stable_bowl(self): + """seat_and_release should raise when no stable bowl exists.""" + eng = JumpDiffusionEngine( + lambda_func=lambda t: 1e6, # extreme lambda → no fixed points + sigma=0.3, jump_rate=0.0, dt=0.01, seed=42, + ) + with pytest.raises(ValueError, match="No stable bowl"): + eng.seat_and_release(t_max=1.0) + + def test_trajectory_stays_bounded(self, engine): + # After seating, x should not wander to extreme values. + result = engine.seat_and_release(t_max=10.0, x0=1.0, lambda_val=0.5, + dwell=50) + assert np.all(np.abs(result['x']) < 30), "Trajectory left reasonable bounds" + + +# --------------------------------------------------------------------------- +# simulate (smoke / integration test) +# --------------------------------------------------------------------------- + +class TestSimulate: + def test_single_realization_structure(self, engine): + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=1) + assert len(results) == 1 + r = results[0] + assert 't' in r and 'x' in r + assert len(r['t']) == len(r['x']) + + def test_multiple_realizations(self, engine): + n = 5 + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=n) + assert len(results) == n + + def test_seeded_reproducibility(self): + """Same seed → identical trajectory.""" + def lf(t): + return 0.5 + + eng1 = JumpDiffusionEngine(lf, sigma=0.3, jump_rate=0.1, dt=0.01, seed=7) + eng2 = JumpDiffusionEngine(lf, sigma=0.3, jump_rate=0.1, dt=0.01, seed=7) + r1 = eng1.simulate(t_max=1.0, x0=0.0, n_realizations=1)[0] + r2 = eng2.simulate(t_max=1.0, x0=0.0, n_realizations=1)[0] + np.testing.assert_array_equal(r1['x'], r2['x']) + + def test_trajectory_mean_reverting(self, engine): + """With no jumps and strong mean reversion, x should stay bounded.""" + results = engine.simulate(t_max=20.0, x0=5.0, n_realizations=3, + record_energy=False) + for r in results: + assert np.all(np.abs(r['x']) < 50), "Trajectory diverged unexpectedly" + + def test_energy_recorded_when_requested(self, engine): + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=1, + record_energy=True) + assert 'energy' in results[0] + assert len(results[0]['energy']) == len(results[0]['x']) + + def test_energy_not_recorded_when_skipped(self, engine): + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=1, + record_energy=False) + assert 'energy' not in results[0] + + +# --------------------------------------------------------------------------- +# plot_trajectories (non-display smoke test) +# --------------------------------------------------------------------------- + +class TestPlotTrajectories: + def test_returns_figure(self, engine, monkeypatch): + import matplotlib.pyplot as plt + # Suppress display in CI + monkeypatch.setattr(plt, "show", lambda: None) + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=2, + record_energy=True) + fig = engine.plot_trajectories(results, show_energy=True) + assert fig is not None + + def test_accepts_single_realization(self, engine, monkeypatch): + import matplotlib.pyplot as plt + monkeypatch.setattr(plt, "show", lambda: None) + results = engine.simulate(t_max=1.0, x0=0.0, n_realizations=1, + record_energy=False) + fig = engine.plot_trajectories(results) + assert fig is not None