API Reference

Configuration

User-facing configuration models for rampa.

All scenario, executor, threshold, and option configuration is defined here as pydantic models with validation. These models are the user’s primary interface for configuring load tests.

>>> import rampa.config
rampa.config.parse_duration(value)
function
function
rampa.config.parse_duration(value)

Parse a human-readable duration string into a timedelta.

Supports h, m, s, and ms components. At least one component must be present.

Parameters:

value (str | datetime.timedelta) – Duration string (e.g. "30s", "1m30s", "2h") or an existing timedelta.

Returns:

Parsed duration.

Return type:

datetime.timedelta

Raises:
rampa.config.Duration
data
data
rampa.config.Duration

A timedelta that can be constructed from a human-readable string.

alias of Annotated[timedelta, BeforeValidator(func=parse_duration, json_schema_input_type=PydanticUndefined)]

class rampa.config.Stage
class
class
class rampa.config.Stage

Bases: BaseModel

A single ramp stage with a target VU or rate count and duration.

>>> import datetime
>>> Stage(
...     duration=datetime.timedelta(seconds=30), target=100,
... ).target
100
>>> Stage(
...     duration=datetime.timedelta(minutes=1), target=0,
... ).duration
datetime.timedelta(seconds=60)
class rampa.config.ScenarioConfig
class
class
class rampa.config.ScenarioConfig

Bases: BaseModel

Configuration for a single test scenario.

Each scenario maps to an executor that controls how iterations are scheduled. Executor-specific fields are validated by the executor, not by this model.

>>> import datetime
>>> cfg = ScenarioConfig(
...     executor="constant-vus",
...     vus=10,
...     duration=datetime.timedelta(seconds=30),
... )
>>> cfg.executor
'constant-vus'
>>> cfg.vus
10
class rampa.config.Options
class
class
class rampa.config.Options

Bases: BaseModel

Shortcut execution options.

These provide a simpler alternative to explicit scenarios for common cases. When both shortcuts and scenarios are provided, validation rejects the config.

>>> import datetime
>>> Options(
...     vus=10, duration=datetime.timedelta(seconds=30),
... ).vus
10
class rampa.config.Config
class
class
class rampa.config.Config

Bases: BaseModel

Top-level test configuration.

Either scenarios or shortcut options (vus, duration, etc.) may be provided, but not both.

>>> import datetime
>>> cfg = Config(
...     scenarios={"smoke": ScenarioConfig(
...         executor="constant-vus",
...         vus=1,
...         duration=datetime.timedelta(seconds=10),
...     )},
... )
>>> "smoke" in cfg.scenarios
True
>>> cfg.scenarios["smoke"].executor
'constant-vus'

Engine

Headless engine and run controller for rampa.

The engine owns execution state and cleanup. Frontends own presentation, JSON shape, and process exit behavior. This separation enables CLI, TUI, MCP, pytest, and GitHub Action integration without reaching into internals.

>>> import rampa.engine
class rampa.engine.EngineOptions
class
class
class rampa.engine.EngineOptions

Bases: object

Configuration for engine behavior.

>>> opts = EngineOptions()
>>> opts.metric_flush_interval
0.05
class rampa.engine.RunController
class
class
class rampa.engine.RunController

Bases: object

Control surface for a running test.

Returned by Engine.start(). Provides wait(), stop(), snapshot(), and events() for frontend consumption.

>>> import rampa.engine
class rampa.engine.Engine
class
class
class rampa.engine.Engine

Bases: object

Headless load testing engine.

Constructs per-run state, starts scenarios, and returns a controller.

Parameters:
  • plan (TestPlan) – Resolved test plan from the loader.

  • options (EngineOptions | None) – Optional engine configuration.

  • rampa.engine (>>> import)

Events

Typed engine events and run status for rampa.

Events are observational — engine correctness does not depend on an event consumer being active. Frontends subscribe to events for live updates.

>>> import rampa.events
class rampa.events.RunStatus
class
class
class rampa.events.RunStatus

Bases: StrEnum

Final status of a completed test run.

>>> RunStatus.PASSED.value
'passed'
>>> RunStatus.THRESHOLD_FAILED.value
'threshold_failed'
rampa.events._make_run_id()
function
function
rampa.events._make_run_id()
Return type:

str

class rampa.events.EngineEvent
class
class
class rampa.events.EngineEvent

Bases: object

Base class for all engine events.

>>> e = EngineEvent(run_id="abc", timestamp_ns=0)
>>> e.run_id
'abc'
class rampa.events.PhaseEvent
class
class
class rampa.events.PhaseEvent

Bases: EngineEvent

Lifecycle phase transition.

>>> e = PhaseEvent(run_id="abc", timestamp_ns=0, phase="setup")
>>> e.phase
'setup'
class rampa.events.PauseEvent
class
class
class rampa.events.PauseEvent

Bases: EngineEvent

Emitted when execution is paused.

>>> e = PauseEvent(run_id="x", timestamp_ns=0)
>>> e.run_id
'x'
class rampa.events.ResumeEvent
class
class
class rampa.events.ResumeEvent

Bases: EngineEvent

Emitted when execution resumes after a pause.

>>> e = ResumeEvent(run_id="x", timestamp_ns=0, paused_seconds=1.5)
>>> e.paused_seconds
1.5
class rampa.events.SnapshotEvent
class
class
class rampa.events.SnapshotEvent

Bases: EngineEvent

Periodic metric snapshot.

>>> from rampa.metrics import MetricSnapshot
>>> snap = MetricSnapshot(timestamp=0, duration=1.0, values={})
>>> e = SnapshotEvent(run_id="x", timestamp_ns=0, snapshot=snap)
>>> e.snapshot.duration
1.0
class rampa.events.ThresholdEvent
class
class
class rampa.events.ThresholdEvent

Bases: EngineEvent

Threshold evaluation results.

>>> e = ThresholdEvent(run_id="x", timestamp_ns=0, results=[])
>>> len(e.results)
0
class rampa.events.LiveThresholdEvent
class
class
class rampa.events.LiveThresholdEvent

Bases: EngineEvent

Mid-run threshold evaluation result.

Emitted periodically during execution by the MetricEngine.

>>> e = LiveThresholdEvent(
...     run_id="x", timestamp_ns=0, results=[], will_abort=False,
... )
>>> e.will_abort
False
class rampa.events.RunResult
class
class
class rampa.events.RunResult

Bases: object

Result of a completed test run.

>>> r = RunResult(
...     run_id="abc",
...     status=RunStatus.PASSED,
...     snapshot=None,
...     threshold_results=[],
... )
>>> r.status
<RunStatus.PASSED: 'passed'>
rampa.events.serialize_event(event)
function
function
rampa.events.serialize_event(event)

Serialize an engine event to a JSON-compatible dict.

Adds a type key with the event class name. All dataclass fields are included via dataclasses.asdict().

Parameters:

event (EngineEvent) – The event to serialize.

Returns:

  • dict[str, Any] – JSON-serializable dictionary.

  • >>> e = PhaseEvent(run_id=”abc”, timestamp_ns=0, phase=”setup”)

  • >>> d = serialize_event(e)

  • >>> d[“type”]

  • ’PhaseEvent’

  • >>> d[“phase”]

  • ’setup’

Return type:

dict[str, Any]

Worker

Worker object passed to user scenario functions.

The Worker provides a scoped API surface for one iteration. It holds references to the sample queue for metric emission and provides check and custom metric helpers.

>>> import rampa.worker
class rampa.worker.ExecutionInfo
class
class
class rampa.worker.ExecutionInfo

Bases: object

Execution context for the current worker iteration.

>>> info = ExecutionInfo(worker_id=1, scenario="load", iteration=0)
>>> info.worker_id
1
class rampa.worker.Worker
class
class
class rampa.worker.Worker

Bases: object

Scoped API surface for user code within a single iteration.

Workers are created per-iteration by the executor. They provide check(), counter(), gauge(), and trend() methods that emit metric samples to the engine’s sample queue.

Parameters:
  • sample_queue (queue.SimpleQueue[Sample | None]) – Queue for emitting metric samples.

  • execution (ExecutionInfo) – Current execution context.

  • setup_data (Any) – Data returned from the setup() function.

Examples

>>> import queue as q
>>> sq: q.SimpleQueue[Sample | None] = q.SimpleQueue()
>>> w = Worker(
...     sample_queue=sq,
...     execution=ExecutionInfo(worker_id=1, scenario="s", iteration=0),
... )
>>> w.execution.worker_id
1

HTTP Client

HTTP client with automatic metric emission for rampa.

Wraps aiohttp.ClientSession and emits timing metrics for every request: http_reqs, http_req_duration, http_req_failed, data transfer counters, and per-phase timing (blocked, connecting, sending, waiting, receiving).

>>> import rampa.http
class rampa.http.Response
class
class
class rampa.http.Response

Bases: object

Wrapper around an HTTP response with metric-relevant fields.

>>> r = Response(status=200, headers={}, body=b"ok", url="http://x")
>>> r.status
200
rampa.http._estimate_request_size(kwargs)
function
function
rampa.http._estimate_request_size(kwargs)

Estimate the size of an outgoing request body in bytes.

Parameters:

kwargs (dict[str, Any]) – Request keyword arguments.

Returns:

  • int – Estimated body size in bytes.

  • >>> _estimate_request_size({“data” (“hello”}))

  • 5

  • >>> _estimate_request_size({“data” (b”bytes”}))

  • 5

  • >>> _estimate_request_size({})

  • 0

Return type:

int

class rampa.http._TraceTimings
class
class
class rampa.http._TraceTimings

Bases: object

Per-request timing accumulator for aiohttp trace signals.

Accepts and ignores trace_request_ctx so it can be used as an aiohttp trace_config_ctx_factory.

>>> t = _TraceTimings()
>>> t.request_start
0
rampa.http._build_trace_config(client)
function
function
rampa.http._build_trace_config(client)

Build an aiohttp TraceConfig that records per-phase timings.

The timings are stored on the trace request context object (a _TraceTimings instance) and read back by the caller after the request completes.

>>> import rampa.http
Parameters:

client (Any)

Return type:

TraceConfig

class rampa.http.HttpClient
class
class
class rampa.http.HttpClient

Bases: object

HTTP client that auto-emits metrics for every request.

Parameters:
  • sample_queue (queue.SimpleQueue[Sample | None]) – Queue for emitting metric samples.

  • tags (dict[str, str]) – Base tags added to every sample (e.g. scenario name).

  • q (>>> import queue as)

  • sq (>>>)

  • HttpClient(sq (>>> client =)

  • {"scenario" ("test"}))

  • client._tags["scenario"] (>>>)

  • 'test'

Metrics

Metric registry, sinks, and aggregation engine for rampa.

The metric engine runs in a dedicated thread, draining samples from a queue.SimpleQueue and updating per-metric sinks. It periodically emits MetricSnapshot objects for outputs and threshold evaluation.

>>> import rampa.metrics
class rampa.metrics.Metric
class
class
class rampa.metrics.Metric

Bases: object

A registered metric with name, type, and value type.

>>> m = Metric(name="http_reqs", metric_type=MetricType.COUNTER)
>>> m.metric_type
<MetricType.COUNTER: 'counter'>
class rampa.metrics.CounterSink
class
class
class rampa.metrics.CounterSink

Bases: object

Accumulates a running total and computes rate.

>>> s = CounterSink()
>>> s.add(1.0)
>>> s.add(2.0)
>>> s.format(10.0)
{'count': 3.0, 'rate': 0.3}
class rampa.metrics.GaugeSink
class
class
class rampa.metrics.GaugeSink

Bases: object

Tracks the latest value plus min and max.

>>> s = GaugeSink()
>>> s.add(10.0)
>>> s.add(5.0)
>>> s.add(20.0)
>>> s.format(1.0)
{'value': 20.0, 'min': 5.0, 'max': 20.0}
class rampa.metrics.RateSink
class
class
class rampa.metrics.RateSink

Bases: object

Tracks the fraction of non-zero (truthy) values.

>>> s = RateSink()
>>> s.add(1.0)
>>> s.add(0.0)
>>> s.add(1.0)
>>> fmt = s.format(1.0)
>>> fmt["passes"]
2.0
>>> fmt["fails"]
1.0
class rampa.metrics.TrendSink
class
class
class rampa.metrics.TrendSink

Bases: object

Stores all values for percentile computation.

Uses linear interpolation matching NumPy’s default algorithm.

>>> s = TrendSink()
>>> for v in [10.0, 20.0, 30.0, 40.0, 50.0]:
...     s.add(v)
>>> fmt = s.format(1.0)
>>> fmt["min"]
10.0
>>> fmt["max"]
50.0
>>> fmt["avg"]
30.0
>>> fmt["med"]
30.0
>>> fmt["count"]
5.0
class rampa.metrics.SinkProtocol
class
class
class rampa.metrics.SinkProtocol

Bases: Protocol

Structural protocol for metric sink implementations.

Enables future Rust PyO3 sinks to satisfy the interface without appearing in a union type.

>>> class DummySink:
...     def add(self, value: float) -> None: ...
...     def format(self, duration: float) -> dict[str, float]:
...         return {}
>>> hasattr(DummySink, "add") and hasattr(DummySink, "format")
True
rampa.metrics.Sink
attribute
attribute
rampa.metrics.Sink

Type alias for metric sink implementations.

class rampa.metrics.HdrTrendSink
class
class
class rampa.metrics.HdrTrendSink

Bases: object

Trend sink backed by a Rust HDR histogram.

Fixed ~20KB memory regardless of sample count. O(1) insert, O(1) percentile queries. Requires the rampa._core Rust extension.

Values are stored as integer microseconds internally. The add() method accepts float milliseconds (matching the Python TrendSink interface) and converts automatically.

>>> try:
...     from rampa._core import HdrHistogram
...     s = HdrTrendSink()
...     for v in [10.0, 20.0, 30.0, 40.0, 50.0]:
...         s.add(v)
...     fmt = s.format(1.0)
...     fmt["count"]
... except ImportError:
...     5.0
5.0
rampa.metrics.create_sink(metric_type)
function
function
rampa.metrics.create_sink(metric_type)

Create a sink matching the metric type.

Uses Rust HDR histogram for trend sinks when available, falling back to the Python TrendSink.

Parameters:

metric_type (MetricType) – The type of metric.

Returns:

  • Sink – A new sink instance.

  • >>> type(create_sink(MetricType.COUNTER)).__name__

  • ’CounterSink’

  • >>> type(create_sink(MetricType.TREND)).__name__ in (‘TrendSink’, ‘HdrTrendSink’)

  • True

Return type:

Sink

rampa.metrics.SubmetricKey
data
data
rampa.metrics.SubmetricKey

Identifies a tag-filtered view of a base metric.

The first element is the base metric name, the second is a frozen set of (tag_key, tag_value) pairs used for matching.

alias of tuple[str, frozenset[tuple[str, str]]]

class rampa.metrics.MetricRegistry
class
class
class rampa.metrics.MetricRegistry

Bases: object

Thread-safe registry that enforces name-type consistency.

>>> reg = MetricRegistry()
>>> reg.get_or_create("reqs", MetricType.COUNTER).metric_type
<MetricType.COUNTER: 'counter'>
>>> reg.get_or_create("reqs", MetricType.COUNTER).name
'reqs'
class rampa.metrics.MetricSnapshot
class
class
class rampa.metrics.MetricSnapshot

Bases: object

Frozen snapshot of aggregated metric values at a point in time.

>>> snap = MetricSnapshot(
...     timestamp=0,
...     duration=10.0,
...     values={"reqs": {"count": 100.0, "rate": 10.0}},
... )
>>> snap.values["reqs"]["rate"]
10.0
rampa.metrics.register_builtins(registry)
function
function
rampa.metrics.register_builtins(registry)

Register all built-in metrics.

Parameters:
  • registry (MetricRegistry) – The registry to populate.

  • MetricRegistry() (>>> reg =)

  • register_builtins(reg) (>>>)

  • None (>>> reg.get_sink("checks") is not)

  • True

  • None

  • True

Return type:

None

class rampa.metrics.MetricEngine
class
class
class rampa.metrics.MetricEngine

Bases: object

Background thread that drains samples and updates sinks.

Parameters:
  • registry (MetricRegistry) – Metric registry with sinks.

  • sample_queue (queue.SimpleQueue[Sample | None]) – Queue of samples. Send None to signal shutdown.

  • flush_interval (float) – Seconds between snapshot emissions.

Examples

>>> eng = MetricEngine(
...     registry=MetricRegistry(),
...     sample_queue=queue.SimpleQueue(),
... )
>>> eng.flush_interval
0.05

Thresholds

Threshold expression parser and evaluator for rampa.

Thresholds define pass/fail criteria over metric aggregates. They are the policy layer — checks measure facts, thresholds decide final status.

>>> import rampa.thresholds
class rampa.thresholds.ThresholdExpression
class
class
class rampa.thresholds.ThresholdExpression

Bases: object

A parsed threshold expression.

>>> expr = parse_threshold("p(95)<500")
>>> expr.aggregation
'p(95)'
>>> expr.operator
'<'
>>> expr.value
500.0
rampa.thresholds.parse_threshold(expr)
function
function
rampa.thresholds.parse_threshold(expr)

Parse a threshold expression string.

Parameters:

expr (str) – Expression like "p(95)<500", "rate<0.01", "avg>200".

Returns:

Parsed expression.

Return type:

ThresholdExpression

Raises:
  • ValueError – If the expression is malformed.

  • >>> parse_threshold("count>=100").aggregation

  • 'count'

  • >>> parse_threshold("med<400").value

  • 400.0

  • >>> parse_threshold("p(99.9)<=1000").aggregation

  • 'p(99.9)'

  • >>> parse_threshold("p(99.9)<=1000").aggregation_value

  • 99.9

rampa.thresholds.parse_submetric(name)
function
function
rampa.thresholds.parse_submetric(name)

Parse a metric name that may include tag filters.

Parameters:

name (str) – Metric name, optionally with tag filters like "http_req_duration{status:200}".

Returns:

  • tuple[str, dict[str, str]] – The base metric name and a dict of tag filters.

  • >>> parse_submetric(“http_req_duration{status (200}”))

  • (‘http_req_duration’, {‘status’ (‘200’}))

  • >>> parse_submetric(“http_reqs”)

  • (‘http_reqs’, {})

  • >>> parse_submetric(“dur{method (GET,name:login}”))

  • (‘dur’, {‘method’ (‘GET’, ‘name’: ‘login’}))

Return type:

tuple[str, dict[str, str]]

class rampa.thresholds.Threshold
class
class
class rampa.thresholds.Threshold

Bases: object

A configured threshold with expression and abort behavior.

>>> t = Threshold(
...     source="p(95)<500",
...     expression=parse_threshold("p(95)<500"),
... )
>>> t.abort_on_fail
False
class rampa.thresholds.ThresholdResult
class
class
class rampa.thresholds.ThresholdResult

Bases: object

Result of evaluating one threshold.

>>> r = ThresholdResult(source="p(95)<500", passed=True, lhs=450.0, rhs=500.0)
>>> r.passed
True
rampa.thresholds._sink_key(aggregation)
function
function
rampa.thresholds._sink_key(aggregation)

Map threshold aggregation to sink format key.

Parameters:

aggregation (str)

Return type:

str

rampa.thresholds.evaluate_threshold(threshold, sink, duration)
function
function
rampa.thresholds.evaluate_threshold(threshold, sink, duration)

Evaluate a single threshold against a sink.

Parameters:
  • threshold (Threshold) – The threshold to evaluate.

  • sink (Sink) – The metric sink containing aggregated values.

  • duration (float) – Elapsed test duration in seconds.

Returns:

  • ThresholdResult – Whether the threshold passed.

  • >>> from rampa.metrics import TrendSink

  • >>> s = TrendSink()

  • >>> for v in range(100)

  • … s.add(float(v))

  • >>> t = Threshold(

  • … source=”p(95)<200”,

  • … expression=parse_threshold(“p(95)<200”),

  • … )

  • >>> evaluate_threshold(t, s, 1.0).passed

  • True

Return type:

ThresholdResult

rampa.thresholds.evaluate_thresholds(metric_thresholds, sinks, duration, sub_sinks=None)
function
function
rampa.thresholds.evaluate_thresholds(metric_thresholds, sinks, duration, sub_sinks=None)

Evaluate all thresholds against their metric sinks.

Parameters:
  • metric_thresholds (dict[str, list[Threshold]]) – Metric name → list of thresholds for that metric. Names may include a tag filter suffix like "dur{status:200}".

  • sinks (dict[str, Sink]) – Base metric name → sink mapping.

  • duration (float) – Elapsed test duration in seconds.

  • sub_sinks (dict[..., Sink] | None) – Optional submetric (base_name, frozenset tags) → sink mapping for tag-filtered threshold evaluation.

Returns:

  • list[ThresholdResult] – Results for each threshold.

  • >>> from rampa.metrics import CounterSink

  • >>> sink = CounterSink()

  • >>> sink.add(50.0)

  • >>> mt = {“reqs” ([Threshold()

  • … source=”count>=100”,

  • … expression=parse_threshold(“count>=100”),

  • … )]}

  • >>> results = evaluate_thresholds(mt, {“reqs” (sink}, 1.0))

  • >>> results[0].passed

  • False

Return type:

list[ThresholdResult]

Loader

Test script loader and @scenario decorator for rampa.

Discovers scenario functions, setup, and teardown from user Python modules.

>>> import rampa.loader
rampa.loader.scenario(name=None, **kwargs)
function
function
rampa.loader.scenario(name=None, **kwargs)

Mark an async function as a rampa scenario.

Parameters:
  • name (str | None) – Scenario name. Defaults to the function name.

  • **kwargs (Any) – ScenarioConfig fields (executor, vus, duration, etc.).

Returns:

  • Callable – The decorated function with scenario metadata attached.

  • >>> @scenario(executor=”constant-vus”, vus=5, duration=”10s”)

  • … async def load_test(worker (object) -> None:)

  • … pass

  • >>> hasattr(load_test, “_rampa_scenario_config”)

  • True

Return type:

Callable[[Callable[..., Any]], Callable[..., Any]]

class rampa.loader.TestPlan
class
class
class rampa.loader.TestPlan

Bases: object

Resolved test plan ready for execution.

>>> plan = TestPlan(scenarios={}, config=Config())
>>> plan.setup_fn is None
True
rampa.loader.load_test(path)
function
function
rampa.loader.load_test(path)

Load a test module and extract the test plan.

Parameters:

path (str) – Path to the Python test script.

Returns:

Resolved test plan with scenarios, setup, teardown, and config.

Return type:

TestPlan

Raises: