Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,5 @@ go

# setuptools_scm
_version.py

dss_bench_out
48 changes: 48 additions & 0 deletions monitoring/dss_bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# dss_bench

Benchmark DSS performance as a function of deployment parameters.
Comment thread
the-glu marked this conversation as resolved.

For every **context** variant x **test**, it: cleans the local ecosystem,
Comment thread
the-glu marked this conversation as resolved.
`make start-locally` with the right env, runs the test against all deployed
DSS at once, and records q/s + latency percentiles. Output is one plot per
Comment thread
the-glu marked this conversation as resolved.
test, q/s and latency vs context.

## Layout

- `config.py` - global settings (node count, image, datastore type, duration, processes).
- `environment.py` - wraps `make start-locally` / `down-locally`, resolves DSS targets.
- `auth.py` - mints Dummy OAuth tokens (audience = DSS hostname).
- `driver.py` - runs `processes` processes per DSS for `duration`, times each call.
- `measure.py` - aggregates into total q/s + median/p95 latency.
- `plot.py` - one PNG per test (q/s + latency, one line per comparison arm).
- `arms.py` - optional comparison arms (image vs image, or datastore vs datastore).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't arms be a property of BenchTest? I would expect someone to describe them as arms of a test.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm no, the arms are global for all benchtest, not specfic to a particular test.

An Arm apply on the global context (eg. two different image), not two variants of the test.

- `run.py` - CLI matrix runner.
- `contexts/` - one file per context.
from 0 to 100 ms).
- `tests/` - one file per test.

## Run

```bash
make image # build the DSS/monitoring images once
Comment thread
the-glu marked this conversation as resolved.
uv run python -m monitoring.dss_bench.run --processes 8 --duration 30
```

## Compare (optional)

Two arms, overlaid on every plot. Omit both flags for no comparison.

```bash
# two PRs / images
uv run python -m monitoring.dss_bench.run --compare-images interuss/dss:pr-A interuss/dss:pr-B
# two datastores
uv run python -m monitoring.dss_bench.run --compare-dbs crdb raft
```

The bench runs on the host: DSS nodes are reached at `http://localhost:80NN`
and tokens carry the matching `dss<j>.uss<i>.localutm` audience, so no extra
container or network is needed.

## Add a context or test

Drop a new file in `contexts/` (subclass `Context`) or `tests/` (subclass `BenchTest`).
34 changes: 34 additions & 0 deletions monitoring/dss_bench/arms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Optional comparison arms: run the whole matrix under several configs and
overlay them on the plots. Default is a single arm (no comparison)."""

import dataclasses
from dataclasses import dataclass, field

from monitoring.dss_bench.config import GlobalConfig


@dataclass
class Arm:
label: str
overrides: dict = field(default_factory=dict)

def apply(self, cfg: GlobalConfig) -> GlobalConfig:
return dataclasses.replace(cfg, **self.overrides)


def single(cfg: GlobalConfig) -> list[Arm]:
return [Arm(label="baseline")]


def compare_images(img_a: str, img_b: str) -> list[Arm]:
return [
Arm(label=img_a, overrides={"dss_image": img_a}),
Arm(label=img_b, overrides={"dss_image": img_b}),
]


def compare_datastores(db_a: str, db_b: str) -> list[Arm]:
return [
Arm(label=db_a, overrides={"db_type": db_a}),
Arm(label=db_b, overrides={"db_type": db_b}),
]
16 changes: 16 additions & 0 deletions monitoring/dss_bench/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Mint a JWT from the local Dummy OAuth server."""

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse an AuthAdapter and only hardcode the AuthSpec?


import requests


def issue_token(endpoint: str, sub: str, audience: str, scopes: list[str]) -> str:
params = {
"grant_type": "client_credentials",
"scope": " ".join(scopes),
"intended_audience": audience,
"issuer": "dummy",
"sub": sub,
}
resp = requests.get(endpoint, params=params, timeout=10)
resp.raise_for_status()
return resp.json()["access_token"]
22 changes: 22 additions & 0 deletions monitoring/dss_bench/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Global, parameterizable settings for a benchmark run."""

from dataclasses import dataclass


@dataclass

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For classes we would likely want to (de)serialize, ImplicitDict is the preferred choice and it seems nearly certain users will want to adjust these values.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it's adjusted through flags and that the internal class to store them, not something that is to be shared/deserialized ?

class GlobalConfig:
# Local ecosystem sizing (consumed by `make start-locally`).
num_uss: int = 3
num_nodes: int = 3
dss_image: str = "interuss/dss:v0.22.0"
db_type: str = "crdb" # crdb | ybdb | raft
intra_netem: str = "delay 600us 40us 25% distribution normal loss 0.0005%"
inter_netem: str = "delay 36ms 40ms 50% distribution paretonormal loss 0.25% 15%"

# Load profile.
duration_s: float = 120.0
processes: int = 4 # parallel processes calling action(), PER DSS

# Dummy OAuth reachable from the host.
oauth_token_endpoint: str = "http://localhost:8085/token"
oauth_sub: str = "uss1"
23 changes: 23 additions & 0 deletions monitoring/dss_bench/contexts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Auto-discover Context subclasses defined in this package."""

import importlib
import inspect
import pkgutil

from monitoring.dss_bench.contexts.base import Context


def discover() -> dict[str, type[Context]]:
found: dict[str, type[Context]] = {}
for info in pkgutil.iter_modules(__path__):
if info.name == "base":
continue
module = importlib.import_module(f"{__name__}.{info.name}")
for _, obj in inspect.getmembers(module, inspect.isclass):
if (
issubclass(obj, Context)
and obj is not Context
and obj.__module__ == module.__name__
):
found[obj.name] = obj
return found
22 changes: 22 additions & 0 deletions monitoring/dss_bench/contexts/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Base class for experiments that sweep across a range of values. A Sweep yields contexts for each value in the range, and contexts are sets of environment variables that are used in `make start-locally` before a measurement."""

from dataclasses import dataclass


@dataclass
class Context:
label: str
"""Label of the characteristic of interest in this context, e.g. '50ms'"""
env: dict[str, str]
"""Additional environment variable values for `make start-locally`"""


class Sweep:
name: str = "base"
"""<description of what this is, and especially how its content should differ from variable_label (why not just use variable_label?)>"""

variable_label: str = "context"
"""Label of the value being swept, useful for labeling the axis in which the contexts are displayed"""

def variants(self) -> list[Context]:
raise NotImplementedError
61 changes: 61 additions & 0 deletions monitoring/dss_bench/contexts/by_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Sweep inter-USS network latency from 0 to 100 ms in 10 ms steps.
Jitter is 5% of the base delay; distribution/loss keep the deployment default.
"""

import re

from monitoring.dss_bench.contexts.base import Context, Variant


class ByLatency(Context):
name = "inter_latency"

axis_label = "inter-USS netem delay"

def __init__(
self,
delays: list[str] | None = None,
jitter_frac: float = 0.05,
correlation: str = "50%",
distribution: str = "paretonormal",
loss: str = "0.25% 15%",
):
self.delays = delays or [
"0ms",
"10ms",
"20ms",
"30ms",
"40ms",
"50ms",
"60ms",
"70ms",
"80ms",
"90ms",
"100ms",
]
self.jitter_frac = jitter_frac
self.correlation = correlation
self.distribution = distribution
self.loss = loss

def _line(self, delay: str) -> str:

# netem rejects `distribution` when base delay is 0; keep only loss there.
m = re.match(r"([\d.]+)\s*([a-z]*)", delay)
value = float(m.group(1)) if m else 0.0

if not m or value == 0:
return f"loss {self.loss}"

jitter = f"{value * self.jitter_frac:g}{m.group(2)}"

return (
f"delay {delay} {jitter} {self.correlation} "
f"distribution {self.distribution} loss {self.loss}"
)

def variants(self) -> list[Variant]:
return [
Variant(label=d, env={"INTER_USS_NETEM_CONF": self._line(d)})
for d in self.delays
]
76 changes: 76 additions & 0 deletions monitoring/dss_bench/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Run a BenchTest with `cfg.processes` processes PER DSS, in parallel across
all DSS, for `cfg.duration_s`. Each process is one sequential caller; q/s
emerges from the number of processes."""

import time
from multiprocessing import Process, Queue

import requests

from monitoring.dss_bench.auth import issue_token
from monitoring.dss_bench.config import GlobalConfig
from monitoring.dss_bench.tests.base import BenchTest


def _worker(
test: BenchTest,
target: tuple[str, str],
cfg: GlobalConfig,
q: Queue,
) -> None:
base_url, audience = target
token = issue_token(cfg.oauth_token_endpoint, cfg.oauth_sub, audience, test.scopes)
session = requests.Session()
session.headers["Authorization"] = f"Bearer {token}"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should leverage the existing auth infrastructure rather than writing from scratch


try:
test.setup(session, base_url)
except Exception:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will prevent even the user from cancelling execution with KeyboardInterrupt; it seems like we should be much narrower in the exceptions we catch. What exceptions would we want to accept and continue for here? Wouldn't we expect the setup to work, and want to stop a test as probably invalid if the setup wasn't successful?

@the-glu the-glu Jun 29, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't prevent the user to cancel execution, Exception is not a subclass of KeyboardException.

>>> import time
>>> try:
...     time.sleep(200)
... except Exception as e:
...     print("Catched")
...     
^CTraceback (most recent call last):
  File "<python-input-2>", line 2, in <module>
    time.sleep(200)
    ~~~~~~~~~~^^^^^
KeyboardInterrupt

However yes, letting the test run when setup fail is probably wrong, I switched to an early return.

I let the teardown catched however: failing is probably less an issue, especially since datastore are reset everytime. It that ok?

pass

latencies_ms: list[float] = []
error_latencies_ms: list[float] = []
done = 0
end = time.monotonic() + cfg.duration_s
while time.monotonic() < end:
t0 = time.monotonic()
try:
test.action(session, base_url)
latencies_ms.append((time.monotonic() - t0) * 1000.0)
done += 1
except Exception:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an overbroad catch; could we just use query_and_describe to catch the right exceptions in the right circumstances and then check whether the query succeeded?

@the-glu the-glu Jun 29, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could restrict the catch, but the idea is to be large to catch others potential errors (like wrong data returned, etc.).

query_and_describe also do much more that simple queries (including potential retries), and in the testing case I don't think we want to do that? Idea is to do simple queries (like others loadtest), not to have the "full" query framework.

# Keep how long the failed call took (e.g. a ~10s timeout) instead
# of dropping it: discarding slow failures biases percentiles down.
error_latencies_ms.append((time.monotonic() - t0) * 1000.0)

try:
test.teardown(session, base_url)
except Exception:
pass

q.put((base_url, latencies_ms, error_latencies_ms))


def run_test(
test: BenchTest, targets: list[tuple[str, str]], cfg: GlobalConfig

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to figure out what "targets" is, requiring tracing though the code; let's just make a simple data structure so it's super clear:

@dataclass
class Target:
    base_url: str
    audience: str
Suggested change
test: BenchTest, targets: list[tuple[str, str]], cfg: GlobalConfig
test: BenchTest, targets: list[Target], cfg: GlobalConfig

...but, it doesn't seem like carrying audience is even necessary since it's a function of the base URL (using an AuthAdapter/UTMClientSession will take care of this automatically).

) -> dict[str, dict]:
"""Return {base_url: {"latencies": [...ms], "error_latencies": [...ms]}}."""
q: Queue = Queue()
procs = []
for target in targets:
for _ in range(cfg.processes):
p = Process(target=_worker, args=(test, target, cfg, q))
p.start()
procs.append(p)

results: dict[str, dict] = {
url: {"latencies": [], "error_latencies": []} for url, _ in targets
}
for _ in procs:
url, lat, err_lat = q.get()
results[url]["latencies"].extend(lat)
results[url]["error_latencies"].extend(err_lat)
for p in procs:
p.join()

return results
55 changes: 55 additions & 0 deletions monitoring/dss_bench/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Drive `make start-locally` / `make down-locally` and resolve DSS targets.

Each DSS node is published on the host at port 80<NN> where NN is the
2-digit global node index, and validates JWTs whose audience equals its
hostname dss<j>.uss<i>.localutm. We therefore hit http://localhost:80NN
Comment thread
the-glu marked this conversation as resolved.
while minting tokens for that audience.
"""

import os
import subprocess
from pathlib import Path

from monitoring.dss_bench.config import GlobalConfig

REPO_ROOT = Path(__file__).resolve().parents[2]


def _env(cfg: GlobalConfig, extra: dict[str, str]) -> dict[str, str]:
env = dict(os.environ)
env.update(
{
"NUM_USS": str(cfg.num_uss),
"NUM_NODES": str(cfg.num_nodes),
"DSS_IMAGE": cfg.dss_image,
"DB_TYPE": cfg.db_type,
"INTRA_USS_NETEM_CONF": cfg.intra_netem,
"INTER_USS_NETEM_CONF": cfg.inter_netem,
}
)
env.update(extra)
return env


def up(cfg: GlobalConfig, extra: dict[str, str]) -> None:
subprocess.run(
["make", "start-locally"], cwd=REPO_ROOT, env=_env(cfg, extra), check=True
)


def down(cfg: GlobalConfig) -> None:
subprocess.run(
["make", "clean-locally"], cwd=REPO_ROOT, env=_env(cfg, {}), check=False
)


def dss_targets(cfg: GlobalConfig) -> list[tuple[str, str]]:
"""Return (base_url, jwt_audience) for every deployed DSS node."""
targets = []
for i in range(1, cfg.num_uss + 1):
for j in range(1, cfg.num_nodes + 1):
node_idx = (i - 1) * cfg.num_nodes + j
url = f"http://localhost:80{node_idx:02d}"
audience = f"dss{j}.uss{i}.localutm"
targets.append((url, audience))
return targets
Loading
Loading