Closed venkatd closed 2 years ago
Unfortunately there is no API for doing so in Elixir because the necessary API for doing this has only been recently added to ERlang. From ERlang/OTP 24 you can use gen_server:send_request/2
to send a call to a GenServer.
And then use one of receive_response/2, wait_response/2, or check_response/2 to fetch the actual result of the request.
So the idea is that you would use send_request/2
to all servers upfront and then wait on the responses. :) Do you this helps? Please cc us in the PR so we can review it. :)
The goal is to have this actually work as a Task in upcoming Elixir versions.
Thanks, I'll look into using this API with Erlang directly :)
wait on the responses
I'm unsure how to handle a list of PIDs. Would there be any edge cases to call send_request
, then call wait_response
on each sequentially?
That is fine but remember that you can end-up waiting timeout * number_of_pids
if the tasks are slow. But you can also address this by starting a timer and checking if the timer expired after receiving each response. Something like this:
timeout = 5000
timer_ref = Process.send_after(self(), :game_over, timeout)
for request <- requests do
:gen_server.wait_response(..., timeout)
receive do
:game_over -> raise "timeout"
after
0 -> :ok
end
end
Process.cancel_timer(timer_ref)
I am working on other priorities so will revisit this later on :)
This is untested, but wanted to check if this variation of your code is in the right direction:
def call_many(servers, message, timeout) do
servers_with_requests =
for s <- servers do
{s, :gen_server.send_request(s, message)}
end
timer_ref = Process.send_after(self(), :overall_timeout, timeout)
# the accumulator is of the form {:ok | :overall_timeout, [{server1, reply1}, ...]}
{_status, servers_with_replies} =
Enum.reduce(servers_with_requests, {:ok, []}, fn
{sub, _req_id}, {:overall_timeout, items} ->
# Ovreall timeout was already triggered, so we immediately go to timeout
{:overall_timeout, [{sub, {:error, :timeout}} | items]}
{sub, req_id}, {status, items} ->
case :gen_server.wait_response(req_id, timeout) do
{:reply, reply} -> {status, [{sub, {:ok, reply}} | items]}
{:error, reason} -> {status, [{sub, {:error, reason}} | items]}
:timeout -> {:overall_timeout, [{sub, {:error, :timeout}} | items]}
end
end)
Process.cancel_timer(timer_ref)
Enum.reverse(servers_with_replies)
end
I went for an Enum.reduce
. Rather than raising an exception, I return a list of [{server, reply}]
tuples.
Almost there! You need to check for the receive timeout after each wait_response:
receive do
:overall_timeout -> {:overall_timeout, items}
after
0 -> :ok
end
Otherwise if your timeout is overall 5 seconds, the current code can wait 4 seconds for each response and still not fail (so 20 altogether for 5 tasks) and still not fail!
Ah that's right, good catch. Thanks!
I've got back to focusing on a more robust await
implementation in our app so dusting this off again.
Here is the implementation and it seems to be working so far. Any suggestions on how to write a few simple tests for this code? Any good examples in the wild to look at?
defmodule Derive.Ext.GenServer do
@type reply :: {:ok, term} | {:error, :timeout}
@type server_with_message :: {GenServer.server(), term}
@type server_with_response :: {GenServer.server(), reply}
@doc """
Given list of {server, message} tuples, send a `GenServer.call/2 to each one
with the given message and returns with the responses of each one.
The returned tuples will always match the structure as the requests, except
the send item will be either {:ok, value} or a timeout error {:error, :timeout}
"""
@spec call_many([server_with_message()], timeout) :: [server_with_response()]
def call_many(servers_with_messages, timeout \\ 5000) do
servers_with_requests =
for {server, message} <- servers_with_messages do
{server, :gen_server.send_request(server, message)}
end
# To avoid waiting (num_calls * timeout) rather than timeout,
# we can check the inbox for :overall_timeout after each request
# to see if we've exceeded the overall timeout
timer_ref = Process.send_after(self(), :overall_timeout, timeout)
# the accumulator is of the form:
# {:ok | :overall_timeout, [{server1, response1}, {server2, response2}, ...]}
# Once the status becomes :overall_timeout, we consider any pending requests as timed out
{_status, servers_with_replies} =
Enum.reduce(servers_with_requests, {:ok, []}, fn
{sub, _req_id}, {:overall_timeout, items} ->
# Overall timeout was already triggered, so we can can immediately
# consider this request as timed out without having to wait for the response
{:overall_timeout, [{sub, {:error, :timeout}} | items]}
{sub, req_id}, {status, items} ->
resp =
case :gen_server.wait_response(req_id, timeout) do
{:reply, reply} ->
{status, [{sub, {:ok, reply}} | items]}
# The GenServer died before a reply was sent
{:error, reason} ->
{status, [{sub, {:error, reason}} | items]}
# We exceeded the overall timeout for this individual call
# So we can globally consider this a timeout
:timeout ->
{:overall_timeout, [{sub, {:error, :timeout}} | items]}
end
# Without this check, it could be possible that we wait (num_calls * timeout) rather than timeout.
# So if we received this message, it means we're out of time
receive do
:overall_timeout -> {:overall_timeout, [{sub, {:error, :timeout}} | items]}
after
0 -> resp
end
end)
Process.cancel_timer(timer_ref)
Enum.reverse(servers_with_replies)
end
end
P.S. - I heard about the research work going on related to set-theoretic types and I'm very excited about the possibilities for improving the developer experience :)
I'm currently testing like this with some hardcoded values:
defmodule Derive.Ex.GenServerTest do
use ExUnit.Case
import Derive.Ext.GenServer
defmodule Server do
use GenServer
def start_link(opts \\ []),
do: GenServer.start_link(__MODULE__, nil, opts)
def init(nil),
do: {:ok, nil}
def handle_call({reply, sleep}, _from, state) do
Process.sleep(sleep)
{:reply, reply, state}
end
end
@timeout 100
describe "Derive.Ext.GenServer.call_many" do
test "awaiting multiple processes" do
{:ok, p1} = Server.start_link()
{:ok, p2} = Server.start_link()
assert [{^p1, {:ok, :p1hi}}, {^p2, {:ok, :p2hi}}] =
call_many([{p1, {:p1hi, 10}}, {p2, {:p2hi, 50}}], @timeout)
end
test "one process that takes too long" do
{:ok, p1} = Server.start_link()
{:ok, p2} = Server.start_link()
assert [{^p1, {:error, :timeout}}, {^p2, {:ok, :p2hi}}] =
call_many([{p1, {:p1hi, 100}}, {p2, {:p2hi, 10}}], 50)
end
end
end
The code looks great to me! 💯
P.S. - I heard about the research work going on related to set-theoretic types and I'm very excited about the possibilities for improving the developer experience :)
Glad to hear!
@TurtleAI/eds
I've currently got an implementation for broadcasting a
GenServer.cast
to the subscribers as you see here:https://github.com/TurtleAI/derive/blob/f2ddeeeb5a5b5bdeb71461d96b80a0a33fa9fcda/lib/derive/broadcaster.ex#L30-L34
I'd also like to implement a version that does a
GenServer.call
in parallel to each subscriber, awaits all of the calls, then returns a keyword list of the results of the form{subscriber, result}
Like
results = Broadcaster.broadcall(server, {:fetch_data, 23})
Is there a best practice for this? I've looked at the
Task
module but not sure what of the best way to handle this.