Hornet
Motivation
Stress testing is a process of deliberately putting a system under intense load to test its stability. Stress testing can validate that a system can withstand expected stress, determine its limit and check its error handling capabilities.
Recently I wanted to stress test the project I’m working on. I checked existing load/stress testing libraries in the Elixir community. I found a couple of abandoned projects and one relatively well-maintained one - Chaperon.
The scenario I wanted is to stress test my system with constant operations per second (o/s) rate over multiple (12-24) hours. The chaperon library provides this feature with the SpreadAsync module. From its docs, it does exactly what I want: “Action that calls a function with a given rate over a given interval of time (ms)”. But after checking its source code, I found a flaw: it schedules a new process for each execution. For example, if you want to run a function with rate 3000 o/s over 20 hours, chaperon
will start 3000 * 60 * 60 * 20 = 216_000_000 processes. If you run multiple stress tests or a single test with high o/s rate, VM machine just fails. If it does not fail, RAM usage increases over time and it may exhaust RAM completely on your machine (I think it happens because chaperon
stores the result of each execution).
I decided to create a new library which would handle long-running stress tests more gracefully, spawning the optimal number of processes. I called this library Hornet
. In this post, I’ll describe its design.
High-level design
Let’s examine basic entities of the Hornet:
- Worker - it periodically executes a given function. Usually, there are several workers during a single stress test.
- RateCounter - it periodically calculates the current rate of o/s.
- Scheduler - it periodically checks if the current rate is equal to the required rate. it increases the number of workers if it’s not true.
Worker
Hornet accepts three required parameters:
rate
(o/s)func
- the anonymous function that has to be executedid
- this unique id is used for internal process names
Initially, the period of execution for a process is process_period
(Default value is 100ms), i.e. it executes the given function every process_period
ms . So Scheduler calculates the starting number of processes using the following logic:
process_rate
= rate (o/s) for a single process = 1_000 ms /process_period
- if
process_rate
/rate
<= 1, it means a single process can execute the given function maintaining the required rate, so Hornet starts a single process with a period of 1_000 /rate
- if
process_rate
/rate
> 1, it means a single proces can not maintain a given rate, so Hornet startsrate / process_rate
workers withproces_period
period
RateCounter
RateCounter
keeps track of the current rate by storing counter
. Every worker after executing the func
function increments this counter. Periodically in rate_period
ms RateCounter calculates the current rate with counter
/ rate_period
.
Scheduler
Often situations may occur when it takes longer to execute the given function than process_period
. For example, if the function requires heavy calculations. In these situations, scheduler stops the running workers and increases process_period
and starts new workers.
Implementation details
To better describe the implementation details, let’s examine code fragments I find important for each entity mentioned above. I added comments to the lines that need explanation.
Worker
Worker
’s job is to periodcally execute the given given function and to increment counter in the RateCounter
after execution.
defmodule Hornet.Worker do
@moduledoc false
use GenServer
def init(params) do
...
Process.send_after(self(), :run_and_schedule, interval) \\ during initialization, the next execution is scheduled
...
end
def handle_info(:run_and_schedule, state) do
execute_and_schedule(state)
{:noreply, state}
end
defp execute_and_schedule(state) do
execute(state)
Process.send_after(self(), :run_and_schedule, state.interval) \\ we schedule the next execution
end
defp execute(state) do
state.func.() \\ we execute the anonymous function
:ok = RateCounter.inc(state.rate_counter) \\ and increase `counter` in `RateCounter` to calculate the current rate
end
end
Sidenote: I’m using Process.send_after/3
to schedule execution instead of :timer.send_interval/2
becuase when experimenting with big number of workers (millions), starting large number of associated timers just freezes VM.
RateCounter
RateCounter
is pretty simple. It stores counter (count
) which is incremented by workers (:inc
) and re-calculates the current rate periodically (:calculate_rate
).
defmodule Hornet.RateCounter do
use GenServer
def init(params) do
...
{:ok, timer} = :timer.send_interval(interval, :calculate_rate) \\ we schedule rate calculation
...
end
def handle_info(:calculate_rate, state) do
rate = state.count * 1000 / state.interval \\ rate is calculated by `current_counter / counter_calculation_interval`
new_state = %{rate: rate, count: 0, timer: state.timer, interval: state.interval} \\ and `counter` is reset
{:noreply, new_state}
end
def handle_cast(:inc, state) do
new_state = %{state | count: state.count + 1} \\ counter is incremented by workers
{:noreply, new_state}
end
end
Scheduler
The most complex logic is in Scheduler
. Its responsibilities include:
- Starting all processes (workers, counter, supervisors)
- Periodically checking the current rate and starting new processes if the current rate is not equal to the expected rate
Workers are started under a separate supervisor called Hornet.Worker.WorkerSupervisor
. RateCounter
and Hornet.Worker.WorkerSupervisor
are supervised by dynamic supervisor Hornet.DynamicSupervisor
so we can easily stop the current workers and start new ones.
defmodule Hornet.Scheduler do
use GenServer
def init(params) do
...
{:ok, supervisor} = HornetDynamicSupervisor.start_link() \\ Hornet starts workers under DynamicSupervisor, so they can be easily replaced
{:ok, rate_counter} =
DynamicSupervisor.start_child(supervisor, %{
id: RateCounter,
start: {RateCounter, :start_link, [[interval: rate_period]]}
}) \\ here we start Scheduler
{pid, workers_count} = start_workers(supervisor, worker_params, rate_counter, period) \\ the implementation of `start_workers` is listed below
{:ok, timer} = :timer.send_interval(adjust_period, :adjust_workers) \\ we periodcally adjust the number of workers
...
end
...
def handle_info(:adjust_workers, state) do
cond do
correct_rate?(state) -> \\ if the current rate is correct, we do nothing
{:noreply, state}
...
true ->
adjust_workers(state) \\ if the expected rate can not be reached, we adjust workers
end
end
defp adjust_workers(state) do
:ok = DynamicSupervisor.terminate_child(state.supervisor, state.worker_supervisor) \\ stop the current workers
new_period = state.period + state.period_step \\ we increase execution period for a single process
{pid, workers_count} =
start_workers(state.supervisor, state.params, state.rate_counter, new_period) \\ and we start new workers
new_state = %{
state
| worker_supervisor: pid,
current_workers_count: workers_count,
period: new_period
}
{:noreply, new_state}
end
defp correct_rate?(state) do
current_rate = RateCounter.rate(state.rate_counter) \\ fetch the current rate from RateCounter
expected_rate = state.params[:rate]
error_rate = expected_rate * state.error_rate \\ we allow error rate. by default it is 10 % (0.1)
if current_rate > expected_rate do
current_rate - expected_rate < error_rate
else
expected_rate - current_rate < error_rate
end
end
defp start_workers(supervisor, params, rate_counter, period) do
...
{interval, initial_workers_number} = calculate_workers_number(rate, period)
...
{:ok, pid} =
DynamicSupervisor.start_child(supervisor, %{
id: :worker_supervisor,
start: {WorkerSupervisor, :start_link, [params]},
type: :supervisor
})
{pid, initial_workers_number}
end
defp calculate_workers_number(rate, period) do
tps = 1_000 / period
if rate / tps <= 1 do
period = round(1000 / rate)
{period, 1}
else
workers = round(rate / tps)
{period, workers}
end
end
end
Conclusion
I hope the library will be useful for stress testing. The libary is availale on GitHub - https://github.com/ayrat555/hornet.
Comments