Open m1ome opened 3 years ago
Distribution storage is unbounded, so yes, you could saturate all memory. I believe there is a warning about this in the docs. If this is of particular concern to you, you can set a recurring job with poller or something else to execute the scrape at some safety interval or monitor the ets table and initiate a scrape if it exceeds whatever threshold you want.
There is a small risk of inaccurate reporting for 2 intervals going that route. The risk could be lowered if you run it at a faster interval, e.g. 1s or 500ms. Under the hood, it's just performing an aggregation and updating the values in ets before exporting. A fast interval would just shift whatever was in-progress if there was an overlap by whatever your interval is. For most use-cases of prometheus, it should still be close enough.
There is a metric reported in the scrape that you can add that will tell you how long your scrapes take to complete if you're using the batteries-included library. I'd recommend monitoring that and set a lower and lower interval until you're minimizing the overlaps.
I've been wary of adding default aggregation intervals for distributions due to some sharp edges for anyone concerned about high accuracy. Capping ets table size could also lead to dropped data points which is also undesirable.
If an optional scheduled aggregation solution were submitted for a PR with docs warning about those tradeoffs when enabling, I think that would be acceptable.
I did something like this in PromEx to solve this exact problem: https://github.com/akoutmos/prom_ex/blob/master/lib/prom_ex/ets_cron_flusher.ex
I can pull some of that work directly into this library if you think it would add value?
Here is my solution, basically this genserver keeps an eye on the ets
prometheus dist table memory size and scrapes it if it becomes too big.
This came straight from my project code, so it is divided in a way that I prefer to split genservers, but you can easily change it if you want.
defmodule Common.Timer do
@moduledoc false
@spec start(pid, term, non_neg_integer) :: reference
def start(pid, msg, timeout), do: :erlang.start_timer(timeout, pid, msg)
@spec cancel(reference) :: non_neg_integer | false
def cancel(timer), do: :erlang.cancel_timer(timer)
end
defmodule Common.Monitor.PrometheusFlusher.State do
@moduledoc false
alias Common.Timer
use TypedStruct
@check_timeout :timer.seconds(30)
typedstruct enforce: true do
field :max_memory, pos_integer
field :timer, reference
end
@spec new!(pos_integer, pid) :: t
def new!(max_memory, from) do
timer = Timer.start(from, :timeout, @check_timeout)
struct!(__MODULE__, max_memory: max_memory, timer: timer)
end
@spec restart_timer(t, pid) :: t
def restart_timer(state, from),
do: %{state | timer: Timer.start(from, :timeout, @check_timeout)}
end
defmodule Common.Monitor.PrometheusFlusher.Impl do
@moduledoc false
alias Common.Monitor.PrometheusFlusher.State
alias TelemetryMetricsPrometheus, as: Prometheus
require Logger
@reporter_name :prometheus_metrics
@default_max_memory_in_mb 512
@spec init(pos_integer | nil, pid) :: State.t()
def init(nil, from), do: State.new!(from_megabytes_to_bytes(@default_max_memory_in_mb), from)
def init(max_memory, from), do: State.new!(from_megabytes_to_bytes(max_memory), from)
@spec flush(pid, State.t()) :: State.t()
def flush(from, %{max_memory: max_memory} = state) do
with memory when is_number(memory) <- get_table_memory(), do: maybe_flush(memory, max_memory)
State.restart_timer(state, from)
end
defp maybe_flush(table_memory, max_memory) when table_memory > max_memory do
Logger.warn("Prometheus table too big, forcing scrape!")
Prometheus.Core.scrape(@reporter_name)
end
defp maybe_flush(_, _) do
Logger.warn("Table in memory range, doing nothing!")
:noop
end
defp get_table_memory do
%{dist_table_id: table} = Prometheus.Core.Registry.config(@reporter_name)
with memory when is_number(memory) <- :ets.info(table, :memory),
do: memory * :erlang.system_info(:wordsize)
end
defp from_megabytes_to_bytes(megabytes), do: megabytes * 1024 * 1024
end
defmodule Common.Monitor.PrometheusFlusher.Server do
@moduledoc false
alias Common.Monitor.PrometheusFlusher.Impl
use GenServer
def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
@impl GenServer
def init(opts) do
max_memory = Keyword.get(opts, :max_memory_in_mb)
{:ok, Impl.init(max_memory, self())}
end
@impl GenServer
def handle_info({:timeout, timer, _}, %{timer: timer} = state),
do: {:noreply, Impl.flush(self(), state)}
def handle_info({:timeout, _, _}, state), do: {:noreply, state}
def handle_info({_, _}, state), do: {:noreply, state}
end
defmodule Common.Monitor.PrometheusFlusher do
@moduledoc false
alias Common.Monitor.PrometheusFlusher.Server
@spec child_spec(keyword) :: Supervisor.child_spec()
defdelegate child_spec(opts), to: Server
end
PrometheusMetricsFlusher.start_link()
Just to make sure if this a possible DoS if your prometheus not scraping a node and you have a distribution metrics on board? Seems so, am i wrong?