mmmries / jetstream

an Elixir client for NATS Jetstream
Apache License 2.0
34 stars 8 forks source link

Allow For Multiple Parallel Messages #41

Open mmmries opened 2 years ago

mmmries commented 2 years ago

The current implementation is essentially "single-threaded" since it requests a single message and then wait for it to arrive, then we handle that message before sending back the ACK or next_message.

My use-case at work would certainly benefit from the ability to specify a limit of how many messages to handle in parallel. Something like:

@impl Jetstream.PullConsumer
  def init(nil) do
    consumer = [
      connection_name: :gnat,
      stream_name: "my_stream",
      consumer_name: "my_stream",
      max_concurrency: 10
    ]
    {:ok, nil, consumer}
  end

This would match the same option name from Task.async_stream in the standard library.

mkaput commented 2 years ago

This is an use-case that I do not want to support in PullConsumer as it is meant to be a super simple API serving say 60-70% use-cases. See #8. Your use case basically would much more benefit from batching messages, so you could then pass these to async_stream or whatever else suits you. For this, I planned to recommend Broadway and to provide Broadway Producer and Consumer modules in this library. What do you think?

mmmries commented 2 years ago

Picking up the thread of conversation from https://github.com/mmmries/jetstream/pull/43#issuecomment-1085724070

I did a handful of benchmarks to check on the performance and overhead of starting multiple PullConsumer processes vs starting a single PullConsumer that delegates the work to separate processes.

Benchmark Process and Results # Basic Assumptions The purpose of these tests is to check for library overhead or bottlenecks. We aren't trying to simulate a realistic workload, we are setting up a simplistic scenario where nats won't be under heavy load so that we can stress test our own library in specific ways. All of these benchmarks were run on my MacBook Pro (13-inch, M1, 2020) with `nats-server: v2.6.6` installed view homebrew. I shut down and started nats between each benchmark run and then created the stream + consumer with memory-only retention to get a comparable nats performance for each run. I also disabled all of the debug logging in our library by adding a `config/config.exs` file with the following content. ```elixir config/config.exs import Config config :logger, backends: [:console], compile_time_purge_matching: [ [level_lower_than: :error] ] ``` For each run, I would start nats server with `nats-server -js`, then start the jetstream project with `iex -S mix` and copy-paste in the test scripts below. ## Single Threaded Scenario (ie current master branch) ``` {:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]}) {:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]}) Enum.each(1..100_000, fn(i) -> Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"})) end) {:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"}) defmodule ConsumeTest do use Jetstream.PullConsumer def start_link(arg) do Jetstream.PullConsumer.start_link(__MODULE__, arg) end @impl true def init(_arg) do {:ok, {nil, nil}, connection_name: :gnat, stream_name: "TEST", consumer_name: "TEST"} end @impl true def handle_message(message, {nil, ni}) do {:ack, {:erlang.monotonic_time(:millisecond), nil}} end def handle_message(message, {first, _last}) do last = :erlang.monotonic_time(:millisecond) {:ack, {first, last}} end end {:ok, pid} = ConsumeTest.start_link(nil) # Wait for all 100k messages to be consumed %{mod_state: %{state: {from, to}}} = :sys.get_state(pid) 100_000.0 / ((to - from) / 1000.0) # this will output a number of messages per second ``` __Results__: 10:34:22.956 to 10:35:35.546 => 22.956 to 95.546 => 72.59sec 100k messages / 72.59sec => 1,377.6 msg/sec After changing log level and removing extra IO.puts messages -576460748985 to -576460695478 (milliseconds) => 53507ms 100k message / 53.507sec => 1868.9 msg/sec ## Multiple PullConsumer Processes ``` defmodule MessageTimer do use GenServer def start_link do GenServer.start_link(__MODULE__, nil, name: __MODULE__) end def init(nil) do state = {0, nil, nil} {:ok, state} end def handle_info({:message, milliseconds}, {0, nil, nil}) do {:noreply, {1, milliseconds, milliseconds}} end def handle_info({:message, milliseconds}, state) do update_state(state, milliseconds) |> check_for_terminal() end defp check_for_terminal({100_000, first, last}) do messages_per_second = 100_000.0 / ((last - first) / 1000.0) IO.puts("Received 100k messages at a rate of #{messages_per_second} messages per second") {:stop, :normal, nil} end defp check_for_terminal(state), do: {:noreply, state} defp update_state({num_received, first, last}, milliseconds) do cond do milliseconds < first -> {num_received + 1, milliseconds, last} milliseconds > last -> {num_received + 1, first, milliseconds} true -> {num_received + 1, first, last} end end end {:ok, _} = MessageTimer.start_link() {:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]}) {:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]}) Enum.each(1..100_000, fn(i) -> Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"})) end) {:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"}) defmodule ConsumeTest do use Jetstream.PullConsumer def start_link(arg) do Jetstream.PullConsumer.start_link(__MODULE__, arg) end @impl true def init(_arg) do {:ok, {nil, nil}, connection_name: :gnat, stream_name: "TEST", consumer_name: "TEST"} end @impl true def handle_message(message, state) do send(MessageTimer, {:message, :erlang.monotonic_time(:millisecond)}) {:ack, state} end end Enum.each(1..10, fn(_) -> {:ok, _pid} = ConsumeTest.start_link(nil) end) ``` > See full list of parallel test results at the bottom of this section ## Batched Message Pull w/ Parallel Processing ``` defmodule MessageTimer do use GenServer def start_link do GenServer.start_link(__MODULE__, nil, name: __MODULE__) end def init(nil) do state = {0, nil, nil} {:ok, state} end def handle_info({:message, milliseconds}, {0, nil, nil}) do {:noreply, {1, milliseconds, milliseconds}} end def handle_info({:message, milliseconds}, state) do update_state(state, milliseconds) |> check_for_terminal() end defp check_for_terminal({100_000, first, last}) do messages_per_second = 100_000.0 / ((last - first) / 1000.0) IO.puts("Received 100k messages at a rate of #{messages_per_second} messages per second") {:stop, :normal, nil} end defp check_for_terminal(state), do: {:noreply, state} defp update_state({num_received, first, last}, milliseconds) do cond do milliseconds < first -> {num_received + 1, milliseconds, last} milliseconds > last -> {num_received + 1, first, milliseconds} true -> {num_received + 1, first, last} end end end {:ok, _} = MessageTimer.start_link() {:ok, _} = Gnat.ConnectionSupervisor.start_link(%{name: :gnat, connection_settings: [%{}]}) {:ok, stream} = Jetstream.API.Stream.create(:gnat, %Jetstream.API.Stream{name: "TEST", storage: :memory, subjects: ["test"]}) Enum.each(1..100_000, fn(i) -> Gnat.pub(:gnat, "test", Jason.encode!(%{num: i, body: "foobar"})) end) {:ok, consumer} = Jetstream.API.Consumer.create(:gnat, %Jetstream.API.Consumer{stream_name: "TEST", durable_name: "TEST"}) defmodule ConsumeTest do use Jetstream.PullConsumer def start_link(arg) do Jetstream.PullConsumer.start_link(__MODULE__, arg) end @impl true def init(_arg) do {:ok, {nil, nil}, connection_name: :gnat, stream_name: "TEST", consumer_name: "TEST"} end @impl true def handle_message(message, state) do send(MessageTimer, {:message, :erlang.monotonic_time(:millisecond)}) {:ack, state} end end {:ok, pid} = ConsumeTest.start_link(nil) ``` __Parallel Test Results__: Parallel | Throughput PullConsumer | Throughput Tasks -- | -- | -- 1 | 29559.56252 | 28768.6997 2 | 48567.26566 | 47370.9143 4 | 73046.01899 | 70422.5352 8 | 96525.09653 | 93023.2558 10 | 88028.16901 | 101419.878 16 | 104166.6667 | 106609.808 20 | 114155.2511 | 120336.943 24 | 120918.9843 | 130548.303 32 | 127713.9208 | 141843.972 64 | 140845.0704 | 149700.599 96 | 134589.502 | 150602.41 128 | 131406.0447 | 149476.831

The results are summarized in this graph.

Screenshot 2022-04-09 at 19 10 20

The blue line shows the messages received acknowledge per second when starting multiple PullConsumer processes and the orange line shows the messages per second when running on a slightly modified version with a single PullConsumer that starts each job in a separate process.

There's not a huge gap in performance between the two methods, so starting multiple PullConsumer's is certainly a viable option.

mmmries commented 2 years ago

this is an use-case that I do not want to support in PullConsumer as it is meant to be a super simple API serving say 60-70% use-cases

@mkaput I think this would be a good topic for us to discuss on a call, but if you have any thoughts before we get schedules lined up, I would be happy to read and try to understand asynchronously.

In my working experience, I have mostly worked at companies that were setting up event architectures between multiple backend services. In every case we have had multiple copies of the app running in production (for redundancy/resiliency). So this would mean having a single PullConsumer running per instance the beam and we would still need to deal with the fact that different instances would get only part of the message history.

And in each of my professional projects, we have wanted to avoid having a queue fall behind just because of a single slow message (maybe something waiting on an IO call), so we have always allowed some number (like 20 - 200) of parallel messages to be processed per instance of the application. This helps to keep each service up-to-date with the stream.

So for me, parallelism is the common-case and I would think of running only a single PullConsumer as being niche. It sounds like you have the opposite experience? I would love to better understand that use-case.

mmmries commented 2 years ago

@mkaput I've continued thinking about the issue of tracking state in the processes receiving messages, vs tracking it elsewhere. There are a few other minor reasons to prefer separate processes:

But I think all of those issues are relatively minor, the big issue is whether we are primarily writing a library to allow individual processes to receive small amounts of messages and track state in memory very efficiently (ie GenServer model) or enabling deployments where you have multiple copies of the app running and you need to potentially keep up with a large volume of messages without blocking the stream.

@byu I would also love to get your take on which of these use-cases best fits your problem space.

marmor157 commented 2 years ago

Hi @mmmries I've run some more benchmarks based on yours but with average of 10 runs per batch size, so it's less likely that we had some weird spikes etc. added https://github.com/membraneframework/beamchmark, so it's possible to see whats is holding us back and also on two different machines and got these results with small message size:

image

Looking at Beamchmark data we've came to a conclusion that requesting Jetstream itself is a chokepoint here and that might explain why in larger batch sizes one PullConsumer is faster as at the start as we send one request instead of i.e. 128

But also to check for larger size of messages I've mocked larger message(~6kB)

image

Then with example message from our system(~1kB)

image

So for me it feels like it really depends on use case and think that we should somehow support both.

Please let me know if I've missed something that would make this measurements incorrect.

If you would like to check out the code I've forked the repo https://github.com/marmor157/jetstream_benchmarks/pull/1

brandynbennett commented 2 years ago

It looks like JetStream has Batching built into its system and it makes sense to me to lean into the built-in batching mechanism as it will likely be the most optimized. It sounds cumbersome to me to require developers to create a separate Jetstream.PullConsumer for each parallel message they want to consume. Best I can tell it looks like the Broadway PR takes care of the complex case with concurrency and batching and the existing Jetstream.PullConsumer is good for simpler cases of processing one message at a time. I'm not super familiar with Broadway, but it makes sense to me to leverage it for the complex concurrency case because it already knows how to do that and won't require reinventing it ourselves.

marmor157 commented 2 years ago

@mmmries What are your thoughts about the Broadway approach of handling parallel messages? We would like to finally make a release of this library and this discussion seems like the only thing that is blocking the release.