sneako / finch

Elixir HTTP client, focused on performance
MIT License
1.26k stars 118 forks source link

Add support for async requests #228

Closed zachallaun closed 1 year ago

zachallaun commented 1 year ago

See #208 for context and past discussion.

Todo


To facilitate review, I'll try to list changes here that affect non-async requests as well so that they can be given careful consideration.

wojtekmach commented 1 year ago

Looking forward to this!

If you don't mind me reqing this thread, here's a proof of concept of using this branch: (thanks to the enhanced :finch_request option also done by @zachallaun :D)

Mix.install([
  :req,
  {:finch, github: "zachallaun/finch", branch: "async", override: true},
  :bandit
])

defmodule MyPlug do
  use Plug.Router

  plug(:match)
  plug(:dispatch)

  get "/redirect" do
    to = "/stream"

    conn
    |> Plug.Conn.put_resp_header("location", to)
    |> Plug.Conn.send_resp(302, "redirecting to #{to}")
  end

  get "/stream" do
    conn = Plug.Conn.send_chunked(conn, 200)

    Enum.reduce_while(~w(a b c d), conn, fn chunk, conn ->
      IO.inspect({:send, chunk})

      case Plug.Conn.chunk(conn, chunk) do
        {:ok, conn} ->
          {:cont, conn}

        {:error, :closed} ->
          {:halt, conn}
      end
    end)
  end
end

fun = fn request, finch_request, finch_name, finch_options ->
  ref = Finch.async_request(finch_request, finch_name, finch_options)

  receive do
    {^ref, {:status, status}} ->
      receive do
        {^ref, {:headers, headers}} ->
          body =
            Stream.unfold(ref, fn ref ->
              receive do
                {^ref, {:data, data}} ->
                  IO.inspect({:recv, data})
                  {data, ref}

                {^ref, :done} ->
                  nil
              end
            end)

          {request, Req.Response.new(status: status, headers: headers, body: body)}
      end
  end
end

{:ok, _} = Bandit.start_link(plug: MyPlug, scheme: :http, port: 4000)

stream = Req.get!("http://localhost:4000/redirect", finch_request: fun).body
IO.inspect({:take, Enum.take(stream, 1)})
Process.sleep(1000)
IO.inspect({:take, Enum.take(stream, 10)})

Outputs:

11:37:41.997 [info] Running MyPlug with Bandit 0.7.7 at 0.0.0.0:4000 (http)

11:37:42.057 [debug] follow_redirects: redirecting to /stream
{:send, "a"}
{:send, "b"}
{:send, "c"}
{:recv, "a"}
{:send, "d"}
{:take, ["a"]}
{:recv, "b"}
{:recv, "c"}
{:recv, "d"}
{:take, ["b", "c", "d"]}

Again just a PoC, eventually this would be behind just a stream: true option or something. I'm curious if it's OK for Req to consume :status and :headers bits like that so it can return a response struct (which we can then pass through response steps)? Or that's undesirable/unsafe? I think I read somewhere that a server may return additional headers down the road. Not sure if that's true or how popular that is but it would break the model above.

Otherwise we'd go with some kind of Req.stream which is just a thin wrapper around this, users would need to handle :status, :headers, :data bits themselves, and we would NOT leverage response steps.

wojtekmach commented 1 year ago

Yeah servers can send Trailer headers. :|

zachallaun commented 1 year ago

If you don't mind me reqing this thread, here's a proof of concept of using this branch

I don’t mind at all! While I hope that others will find good use for this feature, integration with Req is my own primary use-case :)

So: yes, servers can sent trailing headers, and I also think the Mint docs say that you can get multiple headers chunks, so this is theoretically a valid series:

:status
:headers
:data
:data
:data
:headers
:done

That said… I think there’s a way for Req to take this into consideration to allow processing in response steps. (Not doing so would be, imo, a big missed opportunity for Req!) I’m frankly guessing, but I don’t think trailing headers are all that common…?

What if stream steps get a stream of PartialResponse structs? Req consumes the stream until it receives the first :data, then starts emitting partial responses containing status, all headers, and that data chunk? If more headers show up, the next partial response includes them.

(Also, I’m happy discussing wherever you think is best, but maybe it makes sense to move this to the streaming issue in Req since I know there were other folks interested.)

edit: just read the link about Trailer headers. Sounds like you know up front what to expect, which might make it easier to account for or build a step API around.

Edit: only one headers should follow the status according to mint: https://hexdocs.pm/mint/Mint.HTTP.html#stream/2-responses

zachallaun commented 1 year ago

(Unrelated to the discussion with Wojtek above)

If anyone has a script that can be used for benchmarking Finch, I'd love to see it! As mentioned in the OP, I'd like to make sure that this branch is benchmarked against main to ensure no performance regressions for both HTTP1 and 2.

wojtekmach commented 1 year ago

We can continue the conversation elsewhere, good call! https://github.com/wojtekmach/req/issues/82

zachallaun commented 1 year ago

There are still a few things to address, I'd appreciate review at this point! Please see the OP for remaining TODOs and some additional notes.

zachallaun commented 1 year ago

Wrote up a basic benchmark, would appreciate someone else taking a look at some point.

Benchmark

❯ elixir bench_finch.exs 
==> finch
Compiling 5 files (.ex)
Operating System: Linux
CPU Information: Intel(R) Core(TM) i5-4690K CPU @ 3.50GHz
Number of Available Cores: 4
Available memory: 15.59 GB
Elixir 1.14.5
Erlang 25.3.2

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
reduction time: 0 ns
parallel: 1
inputs: http1, http2
Estimated total run time: 14 s

Benchmarking Finch.request!/3 with input http1 ...

15:29:14.187 [info] Running PlugServer with Bandit 1.0.0-pre.5 at 127.0.0.1:4000 (http)
Benchmarking Finch.request!/3 with input http2 ...

15:29:21.278 [info] Running PlugServer with Bandit 1.0.0-pre.5 at 127.0.0.1:4000 (https)

##### With input http1 #####
Name                               ips        average  deviation         median         99th %
Finch.request!/3                5.50 K      181.82 μs    ±32.06%      170.80 μs      379.29 μs
Finch.request!/3 (saved)        5.48 K      182.50 μs    ±29.53%      172.30 μs      305.90 μs

Comparison: 
Finch.request!/3                5.50 K
Finch.request!/3 (saved)        5.48 K - 1.00x slower +0.68 μs

##### With input http2 #####
Name                               ips        average  deviation         median         99th %
Finch.request!/3 (saved)        2.62 K      382.01 μs    ±23.88%      348.60 μs      700.08 μs
Finch.request!/3                2.60 K      385.05 μs    ±20.14%      356.60 μs      661.72 μs

Comparison: 
Finch.request!/3 (saved)        2.62 K
Finch.request!/3                2.60 K - 1.01x slower +3.03 μs

To reproduce:

# in your local finch project
curl -o bench_finch.exs https://github.com/zachallaun/finch/blob/async-bench/bench_finch.exs

git checkout main
elixir bench_finch.exs

git checkout async
elixir bench_finch.exs

# set SAVE env var to overwrite saved results
SAVE=1 elixir bench_finch.exs
zachallaun commented 1 year ago

I started work on telemetry but hit a small roadblock. It's somewhat easy to move the :send measurement into the HTTP/2 pool, but the :recv is trickier because it currently includes the status and headers as telemetry metadata. The pool doesn't track any of that right now -- it just sends it along to the calling process, which collects it in a receive loop.

Do we want to save :status and :headers in the %RequestStream{} so that the pool has them available as metadata to emit telemetry events?

zachallaun commented 1 year ago

Thank you both for the review!

@sneako How would you like to handle #227? Both it and this PR touch the H2 pool and tests and will need to resolve conflicts. If you anticipate merging #227 first, I’m happy to do the work of rebasing this PR!

zachallaun commented 1 year ago

Regarding H2 telemetry, here’s what I’m thinking:

Edit: there isn’t currently a telemetry event for a :send error, but we could emit one for H2.

josevalim commented 1 year ago

Shall we add the remaining bits about H2 telemetry in another PR/discussion? I am actually not sure how much I can help here. It may be worth pinging @whatyouhide (quoted so it doesn't ping him yet).

zachallaun commented 1 year ago

I am actually not sure how much I can help here.

You’ve been a great help so far! I’ll leave it up to @sneako as to whether we group H2 telemetry into this or another PR, but please feel free to unsub from this PR to reduce chatter for now and we can ping you if needed!

sneako commented 1 year ago

Thank you both for the review!

@sneako How would you like to handle #227? Both it and this PR touch the H2 pool and tests and will need to resolve conflicts. If you anticipate merging #227 first, I’m happy to do the work of rebasing this PR!

Don't worry about #227, let's merge this PR first and then I can rebase that much smaller PR if necessary.

I'm ok with continuing h2 telemetry updates in a subsequent PR, there is already a lot going on here and I think consumers like Req can already start playing with the api without all of the telemetry in place.

zachallaun commented 1 year ago

I'm ok with continuing h2 telemetry updates in a subsequent PR, there is already a lot going on here and I think consumers like Req can already start playing with the api without all of the telemetry in place.

Sounds good to me!

For now, should we just add a note to the top of the Finch.Telemetry moduledoc? Thinking something like:

Note that for HTTP2 requests issued with Finch.async_request/3, most telemetry events are not currently emitted. This will be addressed in a later release.

zachallaun commented 1 year ago

@sneako Do you perchance know what's going on with this ranch error? Only on OTP 26.

zachallaun commented 1 year ago

Hmm, it's curious that the error is still there, but the tests are still "passing" in CI despite skipping a number of tests -- it's showing 86 tests run, but there are 90.

sneako commented 1 year ago

I am able to reproduce the otp 26 failure locally, but haven't quite figured out the cause yet... It looks like since this error occurs in the setup_all callback, it causes all 4 tests in this file to be skipped (hence 86 tests), but it does seem like a bug that ExUnit does not consider these to be failures when that happens...

josevalim commented 1 year ago

Agreed. The test suite should be failing. A bug report/PR is welcome!

sneako commented 1 year ago

Opened https://github.com/elixir-lang/elixir/issues/12650

zachallaun commented 1 year ago

@sneako So the problem is with this line. Maybe this is something that's changed in OTP26 (there were changes to SSL), but this needs to be a list of strings. Changing it to alpn_preferred_protocols: ["h2"] gets rid of the error, but then this test fails.

I admit that this is somewhat beyond my understanding. If you have any suggestions about what to do with this test, I'd appreciate it!

sneako commented 1 year ago

Didn't have too much time last night to dig in, but here is the commit that changed this behaviour in OTP: https://github.com/erlang/otp/commit/cd00692e18cc88c369e0a4fdaa124bdea7d67252#diff-c6e4e01fbddfaa6057fc43144940a337eb3db17891c62cce9ad2123e0107f776R2024

It does state: "This commit have changed error messages for a couple of error cases, and is also more restrict with checking the options."

I think they do intend for :undefined to be a valid value since that is still the default, but I'll try to dig a bit more before opening an issue with them

zachallaun commented 1 year ago

Thanks for looking into it!

Given that this is somewhat tangential to this PR, is it worth opening a new issue/PR to address it later?

sneako commented 1 year ago

Sure! Let's just remove OTP 26 from the test matrix for now

zachallaun commented 1 year ago

I’m on my phone at the moment but that sounds good to me! I’ll be able to remove it in about an hour, but you’re also welcome to push to this branch if you are so inclined!

Other than the couple of TODOs that came out of this to address in separate PRs, is there anything else you’d like me to look at/improve here prior to merge?

sneako commented 1 year ago

Thank you for this excellent work!

zachallaun commented 1 year ago

Thank you so much for shepherding it through!

hkrutzer commented 1 year ago

Thanks @zachallaun this is a great improvement for the entire ecosystem.

wojtekmach commented 1 year ago

Thanks again for everyone involved!

I was playing with this API in a test:

Mix.install([
  {:finch, github: "sneako/finch"},
  :bypass
])

ExUnit.start()

defmodule MyApp.Test do
  use ExUnit.Case

  test "it works" do
    bypass = Bypass.open()

    Bypass.expect(bypass, "GET", "/", fn conn ->
      conn = Plug.Conn.send_chunked(conn, 200)
      stream = Stream.map(1..5, &to_string/1)

      Enum.reduce_while(stream, conn, fn chunk, conn ->
        Process.sleep(100)
        IO.inspect(chunk, label: :send)

        case Plug.Conn.chunk(conn, chunk) do
          {:ok, conn} ->
            {:cont, conn}

          {:error, :closed} ->
            {:halt, conn}
        end
      end)
    end)

    start_supervised!({Finch, name: MyFinch})

    ref =
      Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
      |> Finch.async_request(MyFinch)

    receive do
      {^ref, chunk} -> dbg(chunk)
    end
  end
end

And I got this:

[stream.exs:39: MyApp.Test."test it works"/1]
chunk #=> {:status, 200}

  1) test it works (MyApp.Test)
     stream.exs:11
     ** (exit) shutdown

Finished in 0.05 seconds (0.01s on load, 0.00s async, 0.04s sync)
1 test, 1 failure

I think the test process exited while the request was still going but per docs:

If the calling process exits before the request has completed, the request will be canceled.

zachallaun commented 1 year ago

Thanks for reporting! I had this issue at one point while developing the feature but thought it was accounted for. Apparently not! I’ll look into this more tonight.

On Fri, Jun 16, 2023 at 4:00 PM Wojtek Mach @.***> wrote:

Thanks again for everyone involved!

I was playing with this API in a test:

Mix.install([ {:finch, github: "sneako/finch"}, :bypass]) ExUnit.start() defmodule MyApp.Test do use ExUnit.Case

test "it works" do bypass = Bypass.open()

Bypass.expect(bypass, "GET", "/", fn conn ->
  conn = Plug.Conn.send_chunked(conn, 200)
  stream = Stream.map(1..5, &to_string/1)

  Enum.reduce_while(stream, conn, fn chunk, conn ->
    Process.sleep(100)
    IO.inspect(chunk, label: :send)

    case Plug.Conn.chunk(conn, chunk) do
      {:ok, conn} ->
        {:cont, conn}

      {:error, :closed} ->
        {:halt, conn}
    end
  end)
end)

start_supervised!({Finch, name: MyFinch})

ref =
  Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
  |> Finch.async_request(MyFinch)

receive do
  {^ref, chunk} -> dbg(chunk)
end

endend

And I got this:

[stream.exs:39: MyApp.Test."test it works"/1] chunk #=> {:status, 200}

1) test it works (MyApp.Test) stream.exs:11 ** (exit) shutdown

Finished in 0.05 seconds (0.01s on load, 0.00s async, 0.04s sync) 1 test, 1 failure

I think the test process exited while the request was still going but per docs:

If the calling process exits before the request has completed, the request will be canceled.

— Reply to this email directly, view it on GitHub https://github.com/sneako/finch/pull/228#issuecomment-1595226904, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAD3BAVAFVXPJES5DPMLG3LXLS3OBANCNFSM6AAAAAAY2AQ5X4 . You are receiving this because you were mentioned.Message ID: @.***>

zachallaun commented 1 year ago

After a bit more testing, I found that this behavior was already present before async_request was introduced. Here's a replication of the same behavior using Finch 0.16.0 and Finch.stream:

Mix.install([
  {:finch, "0.16.0"},
  :bypass
])

ExUnit.start()

defmodule MyApp.Test do
  use ExUnit.Case

  test "it works" do
    bypass = Bypass.open()

    Bypass.expect(bypass, "GET", "/", fn conn ->
      conn = Plug.Conn.send_chunked(conn, 200)
      stream = Stream.map(1..5, &to_string/1)

      Enum.reduce_while(stream, conn, fn chunk, conn ->
        Process.sleep(100)
        IO.inspect(chunk, label: :send)

        case Plug.Conn.chunk(conn, chunk) do
          {:ok, conn} ->
            {:cont, conn}

          {:error, :closed} ->
            {:halt, conn}
        end
      end)
    end)

    start_supervised!({Finch, name: MyFinch})

    outer = self()

    spawn_link(fn ->
      Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
      |> Finch.stream(MyFinch, nil, fn chunk, _ ->
        send(outer, chunk)
      end)
    end)

    receive do
      chunk -> dbg(chunk)
    end
  end
end

Running this:

❯ elixir scratch.exs
[scratch.exs:44: MyApp.Test."test it works"/1]
chunk #=> {:status, 200}

  1) test it works (MyApp.Test)
     scratch.exs:11
     ** (exit) shutdown

Finished in 0.3 seconds (0.1s on load, 0.00s async, 0.1s sync)
1 test, 1 failure

I believe the Bypass instance is receiving a :DOWN from the Cowboy server, which triggers Bypass to re-raise and exit when it's verifying expectations in a test on_exit callback.

We can verify this by copying what happens in Bypass.open() and setting our own on_exit:

Mix.install([
  {:finch, "0.16.0"},
  :bypass
])

ExUnit.start()

defmodule MyApp.Test do
  use ExUnit.Case

  test "it works" do
    {:ok, pid} = DynamicSupervisor.start_child(Bypass.Supervisor, Bypass.Instance.child_spec([]))
    port = Bypass.Instance.call(pid, :port)
    bypass = %Bypass{pid: pid, port: port}

    on_exit(fn ->
      dbg(Bypass.Instance.call(bypass.pid, :on_exit))
    end)

    Bypass.expect(bypass, "GET", "/", fn conn ->
      conn = Plug.Conn.send_chunked(conn, 200)
      stream = Stream.map(1..5, &to_string/1)

      Enum.reduce_while(stream, conn, fn chunk, conn ->
        Process.sleep(100)
        IO.inspect(chunk, label: :send)

        case Plug.Conn.chunk(conn, chunk) do
          {:ok, conn} ->
            {:cont, conn}

          {:error, :closed} ->
            {:halt, conn}
        end
      end)
    end)

    start_supervised!({Finch, name: MyFinch})

    outer = self()

    spawn_link(fn ->
      Finch.build(:get, "http://0.0.0.0:#{bypass.port}")
      |> Finch.stream(MyFinch, nil, fn chunk, _ ->
        send(outer, chunk)
      end)
    end)

    receive do
      chunk -> dbg(chunk)
    end
  end
end
❯ elixir scratch.exs
[scratch.exs:50: MyApp.Test."test it works"/1]
chunk #=> {:status, 200}

[scratch.exs:17: MyApp.Test."test it works"/1]
Bypass.Instance.call(bypass.pid, :on_exit) #=> {:exit, {:exit, :shutdown, []}}

.
Finished in 0.1 seconds (0.08s on load, 0.00s async, 0.1s sync)
1 test, 0 failures

I think the Plug.Cowboy test server being shut down early when the test exits (the Bypass instance monitors it here and saves the exit as a result here).

Overall, I'm not sure that there's anything to be done in Finch. This sort of feels like a design flaw of Bypass with regards to streaming responses that may not complete.

wojtekmach commented 1 year ago

Oh wow, excellent research. Sorry about that!

zachallaun commented 1 year ago

Nothing to apologize for! If you run into anything else that seems weird while testing it out, please ping me.