Executors¶
Executors control how iterations are scheduled — how many virtual users run, for how long, and at what rate. rampa provides six executor types matching k6’s scheduling vocabulary.
Which executor should I use?¶
I want to… |
Executor |
Model |
|---|---|---|
Run N users for a fixed duration |
|
Closed |
Ramp users up and down |
|
Closed |
Run exactly N total iterations |
|
Closed |
Run N iterations per user |
|
Closed |
Maintain a fixed request rate |
|
Open |
Ramp request rate up and down |
|
Open |
Closed model — each VU waits for the previous iteration to finish before starting the next. The system under test controls the effective rate. Use this when you want to simulate a fixed number of concurrent users.
Open model — iterations start at a fixed rate regardless of
response time. If the system slows down, VUs pile up. Use this when
you want to measure behavior under a specific request rate. If VU
capacity is exhausted, the iteration is counted as dropped_iterations.
constant-vus¶
Run a fixed number of VUs for a duration.
@rampa.scenario(executor="constant-vus", vus=10, duration="30s")
async def default(worker: rampa.Worker) -> None:
await worker.http.get("https://api.example.com/data")
ramping-vus¶
Linearly interpolate VU count between stages.
@rampa.scenario(
executor="ramping-vus",
stages=[
rampa.Stage(duration="30s", target=50),
rampa.Stage(duration="1m", target=100),
rampa.Stage(duration="30s", target=0),
],
)
async def default(worker: rampa.Worker) -> None:
await worker.http.get("https://api.example.com/data")
per-vu-iterations¶
Each VU runs exactly N iterations independently.
@rampa.scenario(
executor="per-vu-iterations",
vus=10,
iterations=100,
)
async def default(worker: rampa.Worker) -> None:
await worker.http.get("https://api.example.com/data")
constant-arrival-rate¶
Maintain a fixed request rate. Iterations that cannot start (all VUs
busy) are counted as dropped_iterations.
@rampa.scenario(
executor="constant-arrival-rate",
rate=100,
duration="1m",
pre_allocated_vus=10,
max_vus=50,
)
async def default(worker: rampa.Worker) -> None:
await worker.http.get("https://api.example.com/data")
ramping-arrival-rate¶
Ramp the request rate linearly through stages.
@rampa.scenario(
executor="ramping-arrival-rate",
stages=[
rampa.Stage(duration="30s", target=50),
rampa.Stage(duration="1m", target=200),
rampa.Stage(duration="30s", target=0),
],
pre_allocated_vus=10,
max_vus=100,
)
async def default(worker: rampa.Worker) -> None:
await worker.http.get("https://api.example.com/data")