API Reference¶
Configuration¶
User-facing configuration models for rampa.
All scenario, executor, threshold, and option configuration is defined here as pydantic models with validation. These models are the user’s primary interface for configuring load tests.
>>> import rampa.config
-
rampa.config.parse_duration(value)¶rampa.config.parse_duration(value)¶
Parse a human-readable duration string into a timedelta.
Supports
h,m,s, andmscomponents. At least one component must be present.- Parameters:
value (
str|datetime.timedelta) – Duration string (e.g."30s","1m30s","2h") or an existing timedelta.- Returns:
Parsed duration.
- Return type:
- Raises:
ValueError– If the string cannot be parsed.>>> parse_duration("30s") –datetime.timedelta(seconds=30) –>>> parse_duration("1m30s") –datetime.timedelta(seconds=90) –>>> parse_duration("2h") –datetime.timedelta(seconds=7200) –>>> parse_duration("500ms") –datetime.timedelta(microseconds=500000) –>>> parse_duration("1h30m15s") –datetime.timedelta(seconds=5415) –>>> parse_duration(datetime.timedelta(seconds=10)) –datetime.timedelta(seconds=10) –
-
rampa.config.Duration¶rampa.config.Duration¶
A timedelta that can be constructed from a human-readable string.
alias of
Annotated[timedelta,BeforeValidator(func=parse_duration, json_schema_input_type=PydanticUndefined)]
-
class rampa.config.Stage¶class rampa.config.Stage¶
Bases:
BaseModelA single ramp stage with a target VU or rate count and duration.
>>> import datetime >>> Stage( ... duration=datetime.timedelta(seconds=30), target=100, ... ).target 100 >>> Stage( ... duration=datetime.timedelta(minutes=1), target=0, ... ).duration datetime.timedelta(seconds=60)
-
class rampa.config.ScenarioConfig¶class rampa.config.ScenarioConfig¶
Bases:
BaseModelConfiguration for a single test scenario.
Each scenario maps to an executor that controls how iterations are scheduled. Executor-specific fields are validated by the executor, not by this model.
>>> import datetime >>> cfg = ScenarioConfig( ... executor="constant-vus", ... vus=10, ... duration=datetime.timedelta(seconds=30), ... ) >>> cfg.executor 'constant-vus' >>> cfg.vus 10
-
class rampa.config.Options¶class rampa.config.Options¶
Bases:
BaseModelShortcut execution options.
These provide a simpler alternative to explicit
scenariosfor common cases. When both shortcuts andscenariosare provided, validation rejects the config.>>> import datetime >>> Options( ... vus=10, duration=datetime.timedelta(seconds=30), ... ).vus 10
-
class rampa.config.Config¶class rampa.config.Config¶
Bases:
BaseModelTop-level test configuration.
Either
scenariosor shortcut options (vus,duration, etc.) may be provided, but not both.>>> import datetime >>> cfg = Config( ... scenarios={"smoke": ScenarioConfig( ... executor="constant-vus", ... vus=1, ... duration=datetime.timedelta(seconds=10), ... )}, ... ) >>> "smoke" in cfg.scenarios True >>> cfg.scenarios["smoke"].executor 'constant-vus'
Engine¶
Headless engine and run controller for rampa.
The engine owns execution state and cleanup. Frontends own presentation, JSON shape, and process exit behavior. This separation enables CLI, TUI, MCP, pytest, and GitHub Action integration without reaching into internals.
>>> import rampa.engine
-
class rampa.engine.EngineOptions¶class rampa.engine.EngineOptions¶
Bases:
objectConfiguration for engine behavior.
>>> opts = EngineOptions() >>> opts.metric_flush_interval 0.05
-
class rampa.engine.RunController¶class rampa.engine.RunController¶
Bases:
objectControl surface for a running test.
Returned by
Engine.start(). Provideswait(),stop(),snapshot(), andevents()for frontend consumption.>>> import rampa.engine
-
class rampa.engine.Engine¶class rampa.engine.Engine¶
Bases:
objectHeadless load testing engine.
Constructs per-run state, starts scenarios, and returns a controller.
- Parameters:
plan (
TestPlan) – Resolved test plan from the loader.options (
EngineOptions|None) – Optional engine configuration.rampa.engine (
>>> import)
Events¶
Typed engine events and run status for rampa.
Events are observational — engine correctness does not depend on an event consumer being active. Frontends subscribe to events for live updates.
>>> import rampa.events
-
class rampa.events.RunStatus¶class rampa.events.RunStatus¶
Bases:
StrEnumFinal status of a completed test run.
>>> RunStatus.PASSED.value 'passed' >>> RunStatus.THRESHOLD_FAILED.value 'threshold_failed'
-
class rampa.events.EngineEvent¶class rampa.events.EngineEvent¶
Bases:
objectBase class for all engine events.
>>> e = EngineEvent(run_id="abc", timestamp_ns=0) >>> e.run_id 'abc'
-
class rampa.events.PhaseEvent¶class rampa.events.PhaseEvent¶
Bases:
EngineEventLifecycle phase transition.
>>> e = PhaseEvent(run_id="abc", timestamp_ns=0, phase="setup") >>> e.phase 'setup'
-
class rampa.events.PauseEvent¶class rampa.events.PauseEvent¶
Bases:
EngineEventEmitted when execution is paused.
>>> e = PauseEvent(run_id="x", timestamp_ns=0) >>> e.run_id 'x'
-
class rampa.events.ResumeEvent¶class rampa.events.ResumeEvent¶
Bases:
EngineEventEmitted when execution resumes after a pause.
>>> e = ResumeEvent(run_id="x", timestamp_ns=0, paused_seconds=1.5) >>> e.paused_seconds 1.5
-
class rampa.events.SnapshotEvent¶class rampa.events.SnapshotEvent¶
Bases:
EngineEventPeriodic metric snapshot.
>>> from rampa.metrics import MetricSnapshot >>> snap = MetricSnapshot(timestamp=0, duration=1.0, values={}) >>> e = SnapshotEvent(run_id="x", timestamp_ns=0, snapshot=snap) >>> e.snapshot.duration 1.0
-
class rampa.events.ThresholdEvent¶class rampa.events.ThresholdEvent¶
Bases:
EngineEventThreshold evaluation results.
>>> e = ThresholdEvent(run_id="x", timestamp_ns=0, results=[]) >>> len(e.results) 0
-
class rampa.events.LiveThresholdEvent¶class rampa.events.LiveThresholdEvent¶
Bases:
EngineEventMid-run threshold evaluation result.
Emitted periodically during execution by the MetricEngine.
>>> e = LiveThresholdEvent( ... run_id="x", timestamp_ns=0, results=[], will_abort=False, ... ) >>> e.will_abort False
-
class rampa.events.RunResult¶class rampa.events.RunResult¶
Bases:
objectResult of a completed test run.
>>> r = RunResult( ... run_id="abc", ... status=RunStatus.PASSED, ... snapshot=None, ... threshold_results=[], ... ) >>> r.status <RunStatus.PASSED: 'passed'>
-
rampa.events.serialize_event(event)¶rampa.events.serialize_event(event)¶
Serialize an engine event to a JSON-compatible dict.
Adds a
typekey with the event class name. All dataclass fields are included viadataclasses.asdict().- Parameters:
event (
EngineEvent) – The event to serialize.- Returns:
dict[str, Any] – JSON-serializable dictionary.
>>> e = PhaseEvent(run_id=”abc”, timestamp_ns=0, phase=”setup”)
>>> d = serialize_event(e)
>>> d[“type”]
’PhaseEvent’
>>> d[“phase”]
’setup’
- Return type:
Worker¶
Worker object passed to user scenario functions.
The Worker provides a scoped API surface for one iteration. It holds references to the sample queue for metric emission and provides check and custom metric helpers.
>>> import rampa.worker
-
class rampa.worker.ExecutionInfo¶class rampa.worker.ExecutionInfo¶
Bases:
objectExecution context for the current worker iteration.
>>> info = ExecutionInfo(worker_id=1, scenario="load", iteration=0) >>> info.worker_id 1
-
class rampa.worker.Worker¶class rampa.worker.Worker¶
Bases:
objectScoped API surface for user code within a single iteration.
Workers are created per-iteration by the executor. They provide check(), counter(), gauge(), and trend() methods that emit metric samples to the engine’s sample queue.
- Parameters:
sample_queue (
queue.SimpleQueue[Sample|None]) – Queue for emitting metric samples.execution (
ExecutionInfo) – Current execution context.setup_data (
Any) – Data returned from the setup() function.
Examples
>>> import queue as q >>> sq: q.SimpleQueue[Sample | None] = q.SimpleQueue() >>> w = Worker( ... sample_queue=sq, ... execution=ExecutionInfo(worker_id=1, scenario="s", iteration=0), ... ) >>> w.execution.worker_id 1
HTTP Client¶
HTTP client with automatic metric emission for rampa.
Wraps aiohttp.ClientSession and emits timing metrics for every request:
http_reqs, http_req_duration, http_req_failed, data transfer
counters, and per-phase timing (blocked, connecting, sending, waiting,
receiving).
>>> import rampa.http
-
class rampa.http.Response¶class rampa.http.Response¶
Bases:
objectWrapper around an HTTP response with metric-relevant fields.
>>> r = Response(status=200, headers={}, body=b"ok", url="http://x") >>> r.status 200
-
rampa.http._estimate_request_size(kwargs)¶rampa.http._estimate_request_size(kwargs)¶
Estimate the size of an outgoing request body in bytes.
-
class rampa.http._TraceTimings¶class rampa.http._TraceTimings¶
Bases:
objectPer-request timing accumulator for aiohttp trace signals.
Accepts and ignores
trace_request_ctxso it can be used as an aiohttptrace_config_ctx_factory.>>> t = _TraceTimings() >>> t.request_start 0
-
rampa.http._build_trace_config(client)¶rampa.http._build_trace_config(client)¶
Build an aiohttp TraceConfig that records per-phase timings.
The timings are stored on the trace request context object (a
_TraceTimingsinstance) and read back by the caller after the request completes.>>> import rampa.http
- Parameters:
client (
Any)- Return type:
TraceConfig
-
class rampa.http.HttpClient¶class rampa.http.HttpClient¶
Bases:
objectHTTP client that auto-emits metrics for every request.
- Parameters:
sample_queue (
queue.SimpleQueue[Sample|None]) – Queue for emitting metric samples.tags (
dict[str,str]) – Base tags added to every sample (e.g. scenario name).q (
>>> import queue as)sq (
>>>)HttpClient(sq (
>>> client =){"scenario" (
"test"}))client._tags["scenario"] (
>>>)'test'
Metrics¶
Metric registry, sinks, and aggregation engine for rampa.
The metric engine runs in a dedicated thread, draining samples from a
queue.SimpleQueue and updating per-metric sinks. It periodically emits
MetricSnapshot objects for outputs and threshold evaluation.
>>> import rampa.metrics
-
class rampa.metrics.Metric¶class rampa.metrics.Metric¶
Bases:
objectA registered metric with name, type, and value type.
>>> m = Metric(name="http_reqs", metric_type=MetricType.COUNTER) >>> m.metric_type <MetricType.COUNTER: 'counter'>
-
class rampa.metrics.CounterSink¶class rampa.metrics.CounterSink¶
Bases:
objectAccumulates a running total and computes rate.
>>> s = CounterSink() >>> s.add(1.0) >>> s.add(2.0) >>> s.format(10.0) {'count': 3.0, 'rate': 0.3}
-
class rampa.metrics.GaugeSink¶class rampa.metrics.GaugeSink¶
Bases:
objectTracks the latest value plus min and max.
>>> s = GaugeSink() >>> s.add(10.0) >>> s.add(5.0) >>> s.add(20.0) >>> s.format(1.0) {'value': 20.0, 'min': 5.0, 'max': 20.0}
-
class rampa.metrics.RateSink¶class rampa.metrics.RateSink¶
Bases:
objectTracks the fraction of non-zero (truthy) values.
>>> s = RateSink() >>> s.add(1.0) >>> s.add(0.0) >>> s.add(1.0) >>> fmt = s.format(1.0) >>> fmt["passes"] 2.0 >>> fmt["fails"] 1.0
-
class rampa.metrics.TrendSink¶class rampa.metrics.TrendSink¶
Bases:
objectStores all values for percentile computation.
Uses linear interpolation matching NumPy’s default algorithm.
>>> s = TrendSink() >>> for v in [10.0, 20.0, 30.0, 40.0, 50.0]: ... s.add(v) >>> fmt = s.format(1.0) >>> fmt["min"] 10.0 >>> fmt["max"] 50.0 >>> fmt["avg"] 30.0 >>> fmt["med"] 30.0 >>> fmt["count"] 5.0
-
class rampa.metrics.SinkProtocol¶class rampa.metrics.SinkProtocol¶
Bases:
ProtocolStructural protocol for metric sink implementations.
Enables future Rust PyO3 sinks to satisfy the interface without appearing in a union type.
>>> class DummySink: ... def add(self, value: float) -> None: ... ... def format(self, duration: float) -> dict[str, float]: ... return {} >>> hasattr(DummySink, "add") and hasattr(DummySink, "format") True
-
class rampa.metrics.HdrTrendSink¶class rampa.metrics.HdrTrendSink¶
Bases:
objectTrend sink backed by a Rust HDR histogram.
Fixed ~20KB memory regardless of sample count. O(1) insert, O(1) percentile queries. Requires the
rampa._coreRust extension.Values are stored as integer microseconds internally. The
add()method accepts float milliseconds (matching the Python TrendSink interface) and converts automatically.>>> try: ... from rampa._core import HdrHistogram ... s = HdrTrendSink() ... for v in [10.0, 20.0, 30.0, 40.0, 50.0]: ... s.add(v) ... fmt = s.format(1.0) ... fmt["count"] ... except ImportError: ... 5.0 5.0
-
rampa.metrics.create_sink(metric_type)¶rampa.metrics.create_sink(metric_type)¶
Create a sink matching the metric type.
Uses Rust HDR histogram for trend sinks when available, falling back to the Python TrendSink.
- Parameters:
metric_type (
MetricType) – The type of metric.- Returns:
Sink – A new sink instance.
>>> type(create_sink(MetricType.COUNTER)).__name__
’CounterSink’
>>> type(create_sink(MetricType.TREND)).__name__ in (‘TrendSink’, ‘HdrTrendSink’)
True
- Return type:
Sink
-
class rampa.metrics.MetricRegistry¶class rampa.metrics.MetricRegistry¶
Bases:
objectThread-safe registry that enforces name-type consistency.
>>> reg = MetricRegistry() >>> reg.get_or_create("reqs", MetricType.COUNTER).metric_type <MetricType.COUNTER: 'counter'> >>> reg.get_or_create("reqs", MetricType.COUNTER).name 'reqs'
-
class rampa.metrics.MetricSnapshot¶class rampa.metrics.MetricSnapshot¶
Bases:
objectFrozen snapshot of aggregated metric values at a point in time.
>>> snap = MetricSnapshot( ... timestamp=0, ... duration=10.0, ... values={"reqs": {"count": 100.0, "rate": 10.0}}, ... ) >>> snap.values["reqs"]["rate"] 10.0
-
rampa.metrics.register_builtins(registry)¶rampa.metrics.register_builtins(registry)¶
Register all built-in metrics.
- Parameters:
registry (
MetricRegistry) – The registry to populate.MetricRegistry() (
>>> reg =)register_builtins(reg) (
>>>)None (
>>> reg.get_sink("checks")is not)True
None
True
- Return type:
-
class rampa.metrics.MetricEngine¶class rampa.metrics.MetricEngine¶
Bases:
objectBackground thread that drains samples and updates sinks.
- Parameters:
registry (
MetricRegistry) – Metric registry with sinks.sample_queue (
queue.SimpleQueue[Sample|None]) – Queue of samples. SendNoneto signal shutdown.flush_interval (
float) – Seconds between snapshot emissions.
Examples
>>> eng = MetricEngine( ... registry=MetricRegistry(), ... sample_queue=queue.SimpleQueue(), ... ) >>> eng.flush_interval 0.05
Thresholds¶
Threshold expression parser and evaluator for rampa.
Thresholds define pass/fail criteria over metric aggregates. They are the policy layer — checks measure facts, thresholds decide final status.
>>> import rampa.thresholds
-
class rampa.thresholds.ThresholdExpression¶class rampa.thresholds.ThresholdExpression¶
Bases:
objectA parsed threshold expression.
>>> expr = parse_threshold("p(95)<500") >>> expr.aggregation 'p(95)' >>> expr.operator '<' >>> expr.value 500.0
-
rampa.thresholds.parse_threshold(expr)¶rampa.thresholds.parse_threshold(expr)¶
Parse a threshold expression string.
- Parameters:
expr (
str) – Expression like"p(95)<500","rate<0.01","avg>200".- Returns:
Parsed expression.
- Return type:
- Raises:
ValueError– If the expression is malformed.>>> parse_threshold("count>=100").aggregation–'count'–>>> parse_threshold("med<400").value–400.0–>>> parse_threshold("p(99.9)<=1000").aggregation–'p(99.9)'–>>> parse_threshold("p(99.9)<=1000").aggregation_value–99.9–
-
rampa.thresholds.parse_submetric(name)¶rampa.thresholds.parse_submetric(name)¶
Parse a metric name that may include tag filters.
- Parameters:
name (
str) – Metric name, optionally with tag filters like"http_req_duration{status:200}".- Returns:
tuple[str, dict[str, str]] – The base metric name and a dict of tag filters.
>>> parse_submetric(“http_req_duration{status (200}”))
(‘http_req_duration’, {‘status’ (‘200’}))
>>> parse_submetric(“http_reqs”)
(‘http_reqs’, {})
>>> parse_submetric(“dur{method (GET,name:login}”))
(‘dur’, {‘method’ (‘GET’, ‘name’: ‘login’}))
- Return type:
-
class rampa.thresholds.Threshold¶class rampa.thresholds.Threshold¶
Bases:
objectA configured threshold with expression and abort behavior.
>>> t = Threshold( ... source="p(95)<500", ... expression=parse_threshold("p(95)<500"), ... ) >>> t.abort_on_fail False
-
class rampa.thresholds.ThresholdResult¶class rampa.thresholds.ThresholdResult¶
Bases:
objectResult of evaluating one threshold.
>>> r = ThresholdResult(source="p(95)<500", passed=True, lhs=450.0, rhs=500.0) >>> r.passed True
-
rampa.thresholds._sink_key(aggregation)¶rampa.thresholds._sink_key(aggregation)¶
Map threshold aggregation to sink format key.
-
rampa.thresholds.evaluate_threshold(threshold, sink, duration)¶rampa.thresholds.evaluate_threshold(threshold, sink, duration)¶
Evaluate a single threshold against a sink.
- Parameters:
- Returns:
ThresholdResult – Whether the threshold passed.
>>> from rampa.metrics import TrendSink
>>> s = TrendSink()
>>> for v in range(100)
… s.add(float(v))
>>> t = Threshold(
… source=”p(95)<200”,
… expression=parse_threshold(“p(95)<200”),
… )
>>> evaluate_threshold(t, s, 1.0).passed
True
- Return type:
-
rampa.thresholds.evaluate_thresholds(metric_thresholds, sinks, duration, sub_sinks=None)¶rampa.thresholds.evaluate_thresholds(metric_thresholds, sinks, duration, sub_sinks=None)¶
Evaluate all thresholds against their metric sinks.
- Parameters:
metric_thresholds (
dict[str,list[Threshold]]) – Metric name → list of thresholds for that metric. Names may include a tag filter suffix like"dur{status:200}".duration (
float) – Elapsed test duration in seconds.sub_sinks (
dict[...,Sink] |None) – Optional submetric (base_name, frozenset tags) → sink mapping for tag-filtered threshold evaluation.
- Returns:
list[ThresholdResult] – Results for each threshold.
>>> from rampa.metrics import CounterSink
>>> sink = CounterSink()
>>> sink.add(50.0)
>>> mt = {“reqs” ([Threshold()
… source=”count>=100”,
… expression=parse_threshold(“count>=100”),
… )]}
>>> results = evaluate_thresholds(mt, {“reqs” (sink}, 1.0))
>>> results[0].passed
False
- Return type:
Loader¶
Test script loader and @scenario decorator for rampa.
Discovers scenario functions, setup, and teardown from user Python modules.
>>> import rampa.loader
-
rampa.loader.scenario(name=None, **kwargs)¶rampa.loader.scenario(name=None, **kwargs)¶
Mark an async function as a rampa scenario.
- Parameters:
- Returns:
Callable – The decorated function with scenario metadata attached.
>>> @scenario(executor=”constant-vus”, vus=5, duration=”10s”)
… async def load_test(worker (object) -> None:)
… pass
>>> hasattr(load_test, “_rampa_scenario_config”)
True
- Return type:
-
class rampa.loader.TestPlan¶class rampa.loader.TestPlan¶
Bases:
objectResolved test plan ready for execution.
>>> plan = TestPlan(scenarios={}, config=Config()) >>> plan.setup_fn is None True
-
rampa.loader.load_test(path)¶rampa.loader.load_test(path)¶
Load a test module and extract the test plan.
- Parameters:
path (
str) – Path to the Python test script.- Returns:
Resolved test plan with scenarios, setup, teardown, and config.
- Return type:
- Raises:
FileNotFoundError– If the script file does not exist.ValueError– If no scenarios are found.