Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,5 @@ go

# setuptools_scm
_version.py

dss_bench_out
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ stop-locally:
down-locally:
build/dev/run_locally.sh down

.PHONY: clean-locally
clean-locally: down-locally
-docker ps -aq --filter network=interop_ecosystem_network | xargs -r docker rm -f
-docker ps -aq --filter network=dss_internal_network | xargs -r docker rm -f

.PHONY: check-monitoring
check-monitoring:
cd monitoring && make test
Expand Down
48 changes: 48 additions & 0 deletions monitoring/dss_bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# dss_bench

Benchmark DSS performance as a function of deployment parameters.

For every **context** variant x **test**, it: cleans the local ecosystem,
`make start-locally` with the right env, runs the test against all deployed
DSS at once, and records q/s + latency percentiles. Output is one plot per
test, q/s and latency vs context.

## Layout

- `config.py` - global settings (node count, image, datastore type, duration, processes).
- `environment.py` - wraps `make start-locally` / `down-locally`, resolves DSS targets.
- `auth.py` - mints Dummy OAuth tokens (audience = DSS hostname).
- `driver.py` - runs `processes` processes per DSS for `duration`, times each call.
- `measure.py` - aggregates into total q/s + median/p95 latency.
- `plot.py` - one PNG per test (q/s + latency, one line per comparison arm).
- `arms.py` - optional comparison arms (image vs image, or datastore vs datastore).
- `run.py` - CLI matrix runner.
- `contexts/` - one file per context.
from 0 to 100 ms).
- `tests/` - one file per test.

## Run

```bash
make image # build the DSS/monitoring images once
uv run python -m monitoring.dss_bench.run --processes 8 --duration 30
```

## Compare (optional)

Two arms, overlaid on every plot. Omit both flags for no comparison.

```bash
# two PRs / images
uv run python -m monitoring.dss_bench.run --compare-images interuss/dss:pr-A interuss/dss:pr-B
# two datastores
uv run python -m monitoring.dss_bench.run --compare-dbs crdb raft
```

The bench runs on the host: DSS nodes are reached at `http://localhost:80NN`
and tokens carry the matching `dss<j>.uss<i>.localutm` audience, so no extra
container or network is needed.

## Add a context or test

Drop a new file in `contexts/` (subclass `Context`) or `tests/` (subclass `BenchTest`).
34 changes: 34 additions & 0 deletions monitoring/dss_bench/arms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Optional comparison arms: run the whole matrix under several configs and
overlay them on the plots. Default is a single arm (no comparison)."""

import dataclasses
from dataclasses import dataclass, field

from monitoring.dss_bench.config import GlobalConfig


@dataclass
class Arm:
label: str
overrides: dict = field(default_factory=dict)

def apply(self, cfg: GlobalConfig) -> GlobalConfig:
return dataclasses.replace(cfg, **self.overrides)


def single(cfg: GlobalConfig) -> list[Arm]:
return [Arm(label="baseline")]


def compare_images(img_a: str, img_b: str) -> list[Arm]:
return [
Arm(label=img_a, overrides={"dss_image": img_a}),
Arm(label=img_b, overrides={"dss_image": img_b}),
]


def compare_datastores(db_a: str, db_b: str) -> list[Arm]:
return [
Arm(label=db_a, overrides={"db_type": db_a}),
Arm(label=db_b, overrides={"db_type": db_b}),
]
16 changes: 16 additions & 0 deletions monitoring/dss_bench/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Mint a JWT from the local Dummy OAuth server."""

import requests


def issue_token(endpoint: str, sub: str, audience: str, scopes: list[str]) -> str:
params = {
"grant_type": "client_credentials",
"scope": " ".join(scopes),
"intended_audience": audience,
"issuer": "dummy",
"sub": sub,
}
resp = requests.get(endpoint, params=params, timeout=10)
resp.raise_for_status()
return resp.json()["access_token"]
22 changes: 22 additions & 0 deletions monitoring/dss_bench/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Global, parameterizable settings for a benchmark run."""

from dataclasses import dataclass


@dataclass
class GlobalConfig:
# Local ecosystem sizing (consumed by `make start-locally`).
num_uss: int = 3
num_nodes: int = 3
dss_image: str = "interuss/dss:v0.22.0"
db_type: str = "crdb" # crdb | ybdb | raft
intra_netem: str = "delay 600us 40us 25% distribution normal loss 0.0005%"
inter_netem: str = "delay 36ms 40ms 50% distribution paretonormal loss 0.25% 15%"

# Load profile.
duration_s: float = 120.0
processes: int = 4 # parallel processes calling action(), PER DSS

# Dummy OAuth reachable from the host.
oauth_token_endpoint: str = "http://localhost:8085/token"
oauth_sub: str = "uss1"
23 changes: 23 additions & 0 deletions monitoring/dss_bench/contexts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Auto-discover Context subclasses defined in this package."""

import importlib
import inspect
import pkgutil

from monitoring.dss_bench.contexts.base import Context


def discover() -> dict[str, type[Context]]:
found: dict[str, type[Context]] = {}
for info in pkgutil.iter_modules(__path__):
if info.name == "base":
continue
module = importlib.import_module(f"{__name__}.{info.name}")
for _, obj in inspect.getmembers(module, inspect.isclass):
if (
issubclass(obj, Context)
and obj is not Context
and obj.__module__ == module.__name__
):
found[obj.name] = obj
return found
18 changes: 18 additions & 0 deletions monitoring/dss_bench/contexts/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Base class for context generators. A context yields environment variants
that are applied to `make start-locally` before a measurement."""

from dataclasses import dataclass


@dataclass
class Variant:
label: str # x-axis label, e.g. "50ms"
env: dict[str, str] # extra env for start-locally


class Context:
name: str = "base"
axis_label: str = "context" # descriptive x-axis label

def variants(self) -> list[Variant]:
raise NotImplementedError
61 changes: 61 additions & 0 deletions monitoring/dss_bench/contexts/by_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Sweep inter-USS network latency from 0 to 100 ms in 10 ms steps.
Jitter is 5% of the base delay; distribution/loss keep the deployment default.
"""

import re

from monitoring.dss_bench.contexts.base import Context, Variant


class ByLatency(Context):
name = "inter_latency"

axis_label = "inter-USS netem delay"

def __init__(
self,
delays: list[str] | None = None,
jitter_frac: float = 0.05,
correlation: str = "50%",
distribution: str = "paretonormal",
loss: str = "0.25% 15%",
):
self.delays = delays or [
"0ms",
"10ms",
"20ms",
"30ms",
"40ms",
"50ms",
"60ms",
"70ms",
"80ms",
"90ms",
"100ms",
]
self.jitter_frac = jitter_frac
self.correlation = correlation
self.distribution = distribution
self.loss = loss

def _line(self, delay: str) -> str:

# netem rejects `distribution` when base delay is 0; keep only loss there.
m = re.match(r"([\d.]+)\s*([a-z]*)", delay)
value = float(m.group(1)) if m else 0.0

if not m or value == 0:
return f"loss {self.loss}"

jitter = f"{value * self.jitter_frac:g}{m.group(2)}"

return (
f"delay {delay} {jitter} {self.correlation} "
f"distribution {self.distribution} loss {self.loss}"
)

def variants(self) -> list[Variant]:
return [
Variant(label=d, env={"INTER_USS_NETEM_CONF": self._line(d)})
for d in self.delays
]
77 changes: 77 additions & 0 deletions monitoring/dss_bench/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Run a BenchTest with `cfg.processes` processes PER DSS, in parallel across
all DSS, for `cfg.duration_s`. Each process is one sequential caller; q/s
emerges from the number of processes."""

import time
from multiprocessing import Process, Queue

import requests

from monitoring.dss_bench.auth import issue_token
from monitoring.dss_bench.config import GlobalConfig
from monitoring.dss_bench.tests.base import BenchTest


def _worker(
test: BenchTest,
target: tuple[str, str],
cfg: GlobalConfig,
q: Queue,
) -> None:
base_url, audience = target
token = issue_token(cfg.oauth_token_endpoint, cfg.oauth_sub, audience, test.scopes)
session = requests.Session()
session.headers["Authorization"] = f"Bearer {token}"

try:
test.setup(session, base_url)
except Exception:
pass

latencies_ms: list[float] = []
error_latencies_ms: list[float] = []
done = 0
end = time.monotonic() + cfg.duration_s
while time.monotonic() < end:
t0 = time.monotonic()
try:
test.action(session, base_url)
latencies_ms.append((time.monotonic() - t0) * 1000.0)
done += 1
except Exception:
# Keep how long the failed call took (e.g. a ~10s timeout) instead
# of dropping it: discarding slow failures biases percentiles down.
error_latencies_ms.append((time.monotonic() - t0) * 1000.0)

try:
test.teardown(session, base_url)
except Exception:
pass

q.put((base_url, latencies_ms, error_latencies_ms))


def run_test(
test: BenchTest, targets: list[tuple[str, str]], cfg: GlobalConfig
) -> dict[str, dict]:
"""Return {base_url: {"latencies": [...ms], "error_latencies": [...ms]}}."""
test.prepare(cfg, targets)
q: Queue = Queue()
procs = []
for target in targets:
for _ in range(cfg.processes):
p = Process(target=_worker, args=(test, target, cfg, q))
p.start()
procs.append(p)

results: dict[str, dict] = {
url: {"latencies": [], "error_latencies": []} for url, _ in targets
}
for _ in procs:
url, lat, err_lat = q.get()
results[url]["latencies"].extend(lat)
results[url]["error_latencies"].extend(err_lat)
for p in procs:
p.join()

return results
55 changes: 55 additions & 0 deletions monitoring/dss_bench/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Drive `make start-locally` / `make down-locally` and resolve DSS targets.

Each DSS node is published on the host at port 80<NN> where NN is the
2-digit global node index, and validates JWTs whose audience equals its
hostname dss<j>.uss<i>.localutm. We therefore hit http://localhost:80NN
while minting tokens for that audience.
"""

import os
import subprocess
from pathlib import Path

from monitoring.dss_bench.config import GlobalConfig

REPO_ROOT = Path(__file__).resolve().parents[2]


def _env(cfg: GlobalConfig, extra: dict[str, str]) -> dict[str, str]:
env = dict(os.environ)
env.update(
{
"NUM_USS": str(cfg.num_uss),
"NUM_NODES": str(cfg.num_nodes),
"DSS_IMAGE": cfg.dss_image,
"DB_TYPE": cfg.db_type,
"INTRA_USS_NETEM_CONF": cfg.intra_netem,
"INTER_USS_NETEM_CONF": cfg.inter_netem,
}
)
env.update(extra)
return env


def up(cfg: GlobalConfig, extra: dict[str, str]) -> None:
subprocess.run(
["make", "start-locally"], cwd=REPO_ROOT, env=_env(cfg, extra), check=True
)


def down(cfg: GlobalConfig) -> None:
subprocess.run(
["make", "clean-locally"], cwd=REPO_ROOT, env=_env(cfg, {}), check=False
)


def dss_targets(cfg: GlobalConfig) -> list[tuple[str, str]]:
"""Return (base_url, jwt_audience) for every deployed DSS node."""
targets = []
for i in range(1, cfg.num_uss + 1):
for j in range(1, cfg.num_nodes + 1):
node_idx = (i - 1) * cfg.num_nodes + j
url = f"http://localhost:80{node_idx:02d}"
audience = f"dss{j}.uss{i}.localutm"
targets.append((url, audience))
return targets
Loading
Loading