NFIBrokerage / spear

A sharp EventStoreDB v20+ client backed by Mint :yum:
https://hex.pm/packages/spear
Apache License 2.0
87 stars 14 forks source link

implement backpressure for stream requests #1

Closed the-mikedavis closed 3 years ago

the-mikedavis commented 3 years ago

currently Spear.append/4 with an infinite stream fails (expected) but not for the expected reason (GenServer timeout)

instead it gives

Stream.iterate(0, &(&1 + 1))
|> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
|> Spear.append(conn, "InfiniteCounter", expect: :empty)
# =>
{:error,
 %Mint.HTTPError{
   module: Mint.HTTP2,
   reason: {:exceeds_window_size, :connection, 26}
 }}

Currently the setup for all out-bound requests is like so:

defp request_and_stream_body(conn, request) do
  with {:ok, conn, request_ref} <-
         Mint.HTTP.request(conn, @post, request.path, request.headers, :stream),
       {:ok, conn} <- stream_body(conn, request_ref, request.messages),
       {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, :eof) do
    {:ok, conn, request_ref}
  else
    {:error, conn, reason} -> {:error, conn, reason}
  end
end

and stream_body/3 is implemented like so:

defp stream_body(conn, request_ref, messages) do
  Enum.reduce_while(messages, {:ok, conn}, fn message, {:ok, conn} ->
    {message, _byte_size} = Request.to_wire_data(message)

    stream_result =
      Mint.HTTP.stream_request_body(
        conn,
        request_ref,
        message
      )

    case stream_result do
      {:ok, conn} -> {:cont, {:ok, conn}}
      error -> {:halt, error}
    end
  end)
end

As it turns out, this is actually very similar to the streaming implementation in finch! (PR that added streaming support: https://github.com/keathley/finch/pull/107/files#diff-48431cc1d91063480b5006d7585c96ea39433e319aca2b5e3a6c597fdbd7e10fR153-R158)

If we add some IO.inspect/2s of the window sizes in that Enum.reduce_while/3

conn |> Mint.HTTP2.get_window_size(:connection) |> IO.inspect(label: "connection window")
conn |> Mint.HTTP2.get_window_size({:request, request_ref}) |> IO.inspect(label: "request window")

we can see the window size for the connection and request gradually decreasing down to (in this case) 26, which is not enough to send the next message.

We can cheaply detect the window sizes as we reduce, but it's not immediately clear how to halt the stream temporarily while we Mint.HTTP.stream/2 and await a WINDOW_UPDATE frame (once we realize that our window is not large enough).

the-mikedavis commented 3 years ago

We cannot exactly "split" arbitrary streams in Elixir (see this issue) because of the lack of built-in thunks and that streams tend to wrap stateful resources like files.

I would say that this is accomplishable with piping messages through Stream.with_index and using that to halt the stream and pick up again from where it left off with Stream.drop/2, but that will re-process the messages which we have already sent. Probably not that big of a deal for most streams but I could see that being a real problem over stateful resources like a very large file, which this function is likely to handle

the-mikedavis commented 3 years ago

the other approach I think has legs is to detect that the message is going to exceed a window and synchronously waiting in the Enum.reduce_while/3 for messages from the server, either with Kernel.SpecialForms.receive/1 or by switching the connection to passive mode and reading out responses with Mint.HTTP.recv/3.

Enum.reduce_while(messages, {:ok, conn}, fn message, {:ok, conn} ->
  {wire_data, byte_size} = Request.to_wire_data(message)
  connection_window = Mint.HTTP2.get_window_size(conn, :connection)
  request_window = Mint.HTTP2.get_window_size(conn, {:request, request_ref})
  smaller_window = min(connection_window, request_window)

  with false <- byte_size > smaller_window,
       {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, wire_data) do
    {:cont, {:ok, conn}}
  else
    true -> get_until_window_increase(conn, smaller_window)
    error -> {:halt, error}
  end
end)

with get_until_window_increase/2 being implemented something like

defp get_until_window_increase(conn, smaller_window) do
  # TODO handle `responses`
  with {:ok, conn, responses} <- Mint.HTTP.recv(conn, 0, 5_000),
       connection_window = Mint.HTTP2.get_window_size(conn, :connection),
       request_window = Mint.HTTP2.get_window_size(conn, {:request, request_ref}),
       new_smaller_window when new_smaller_window > smaller_window <-
         min(connection_window, request_window) do
    {:cont, {:ok, conn}}
  else
    ^smaller_window -> get_until_window_increase(conn, smaller_window)
    error -> {:halt, error}
  end
end

In either case, the connection may receive messages for other request_refs while synchronously waiting for the WINDOW_UPDATE frame, so those responses would need to be passed through the process_response/2 function so that other read requests and subscriptions be processed and not dropped. This isn't the end of the world but would require a small refactor to pass around state instead of just conn

the-mikedavis commented 3 years ago

looks like this is also an issue in the finch implementation https://github.com/keathley/finch/issues/88

the-mikedavis commented 3 years ago

worth checking if this is just the EventStore telling the client to stop trying to send so much data, but it appears as though the EventStore is properly telling the client to expand window size in the connection (stream 0)

spear-infinite-connection-window-update

and also later in the request (stream 5)

spear-infinite-request-window-update

but as the code is not directing mint to listen for these, mint is not magically adjusting the window

so it appears as though this is standard HTTP2 request back-pressure

the-mikedavis commented 3 years ago

the Mint.HTTP.recv/3 approach is probably wiser as a receive/1 will also catch incoming genserver calls/casts

the-mikedavis commented 3 years ago

See #3 for the implementation that :hocho:d this

instead of synchronously blocking in the request_and_stream_body/2 (now request_and_stream_body/3) with either Kernel.SpecialForms.receive/1 or Mint.HTTP.recv/3, we use t:Enumerable.continuation/0s from Enumerable.reduce/3 to

this has the potential for hanging a suspended stream if the server never replenishes our window, but luckily Spear is only written to interact with one kind of server: an EventStoreDB, which we can show to be conformant with proper window refill behavior. I.e. I think it's an accetable risk