NFIBrokerage / spear

A sharp EventStoreDB v20+ client backed by Mint :yum:
https://hex.pm/packages/spear
Apache License 2.0
85 stars 14 forks source link

Add (dis)connect hook options, document pooling #80

Closed kristofka closed 1 year ago

kristofka commented 1 year ago

This is a WIP, POC, based on routing pools. The main difference wrt the post is that we hold a reference to a counter in the ets table.

The changes in spear/connection.ex and spear/connection/configuration.ex would allow a user to implement their Pool.

For large events/batch of events, using the pool is much faster :

iex(1)> Spear.Pool.Supervisor.start_link(connection_args: [connection_string: "esdb://localhost:2113"], num_members: 100)
{:ok, #PID<0.339.0>}
iex(2)> {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://localhost:2113")
{:ok, #PID<0.448.0>}
iex(3)> streams = for n <- 1..100, do: "stream #{n}"
["stream 1", "stream 2", "stream 3", "stream 4", "stream 5", "stream 6",
 "stream 7", "stream 8", "stream 9", "stream 10", "stream 11", "stream 12",
 "stream 13", "stream 14", "stream 15", "stream 16", "stream 17", "stream 18",
 "stream 19", "stream 20", "stream 21", "stream 22", "stream 23", "stream 24",
 "stream 25", "stream 26", "stream 27", "stream 28", "stream 29", "stream 30",
 "stream 31", "stream 32", "stream 33", "stream 34", "stream 35", "stream 36",
 "stream 37", "stream 38", "stream 39", "stream 40", "stream 41", "stream 42",
 "stream 43", "stream 44", "stream 45", "stream 46", "stream 47", "stream 48",
 "stream 49", "stream 50", ...]
iex(4)> gen_events = fn -> Enum.to_list(1..10000) |> Enum.map(fn n -> Spear.Event.new("test_event", %{title: "event" <> (:uuid.get_v4() |> :uuid.uuid_to_string() |> to_string())}, id: (:uuid.get_v4() |> :uuid.uuid_to_string() |> to_string() )  ) end) end
warning: variable "n" is unused (if the variable is not meant to be used, prefix it with an underscore)
  iex:4

#Function<43.3316493/0 in :erl_eval.expr/6>
iex(5)> gen_events = fn -> Enum.to_list(1..10000) |> Enum.map(fn n -> Spear.Event.new("test_event", %{title: "event" <> (:uuid.get_v4() |> :uuid.uuid_to_string() |> to_string())}, id: (:uuid.get_v4() |> :uuid.uuid_to_string() |> to_string() )  ) end) end
warning: variable "n" is unused (if the variable is not meant to be used, prefix it with an underscore)
  iex:5

#Function<43.3316493/0 in :erl_eval.expr/6>
iex(6)>  single_conn = fn -> Enum.map(streams, fn s -> Task.async(fn -> gen_events.() |> Spear.append(conn, s, timeout: 50_000) end) end) |> Task.await_many(:infinity) end        #Function<43.3316493/0 in :erl_eval.expr/6>
iex(7)> multi_conn = fn -> Enum.map(streams, fn s -> Task.async(fn -> {:ok, next_conn } = Spear.Pool.Supervisor.get_conn(); gen_events.() |>  Spear.append( next_conn, s, timeout: 50_000) end) end) |> Task.await_many(:infinity) end
#Function<43.3316493/0 in :erl_eval.expr/6>
iex(8)> Benchee.run(
...(8)> %{
...(8)> "single_conn" => single_conn,
...(8)> "multi_conn" => multi_conn},
...(8)> time: 2)
Operating System: macOS
CPU Information: Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz
Number of Available Cores: 8
Available memory: 16 GB
Elixir 1.14.2
Erlang 25.1.1

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 2 s
memory time: 0 ns
reduction time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 8 s

Benchmarking multi_conn ...
Benchmarking single_conn ...

Name                  ips        average  deviation         median         99th %
multi_conn         0.0407        24.59 s     ±0.00%        24.59 s        24.59 s
single_conn        0.0222        45.13 s     ±0.00%        45.13 s        45.13 s

Comparison:
multi_conn         0.0407
single_conn        0.0222 - 1.84x slower +20.54 s

For short writes (eg 5 to 10 small events) getting a conn from the pool adds 7-10 pc to the execution time.

Regarding reads, using the pool is generally faster, and it allows running much more reads in parallel without hitting the :too_many_concurrent_connections from Mint.HTTP2 (which are 100 reqs on a single conn, by default).

iex(11)> Benchee.run(
...(11)> %{   
...(11)> "read_single_conn" => fn -> Enum.map(streams, fn s -> Task.async(fn -> Spear.stream!(conn, s, chunk_size: 10) |> Enum.count() end) end) |> Task.await_many(:infinity) end, 
"read_multi_conn" => fn -> Enum.map(streams, fn s -> Task.async(fn -> {:ok, next_conn } = Spear.Pool.Supervisor.get_conn();Spear.stream!( next_conn, s, chunk_size: 10) |> Enum.count() end) end) |> Task.await_many(:infinity) end},
...(11)> time: 2)

Operating System: macOS                                                                                                                                                            CPU Information: Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz
Number of Available Cores: 8
Available memory: 16 GB
Elixir 1.14.2
Erlang 25.1.1

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 2 s
memory time: 0 ns
reduction time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 8 s

Benchmarking read_multi_conn ...
Benchmarking read_single_conn ...

Name                       ips        average  deviation         median         99th %
read_multi_conn        0.00822       2.03 min     ±0.00%       2.03 min       2.03 min
read_single_conn       0.00546       3.05 min     ±0.00%       3.05 min       3.05 min

Comparison:
read_multi_conn        0.00822
read_single_conn       0.00546 - 1.50x slower +1.02 min

Given that there are so little changes to the core code, and that the provided pool could be seen as a basic example, I don't think there would be any downside to this feature. But I might have missed something.

Let me know what you think.

the-mikedavis commented 1 year ago

This looks cool! 😀

I'm away at the moment but I should be able to give this a proper look in the coming week

kristofka commented 1 year ago

If you're interested, I'll add some documentation and tests.