Open avmnu-sng opened 2 years ago
@iMacTia Even after providing a way to access response information from the adapters, we still need to fix the inaccuracy in the start time. For that, we need to patch the ActiveSupport::Notifications::Fanout
to use a Concurrent::Map
and store the start time based on request-id.
Changing the existing behavior entirely is not a good idea, so, we can introduce start_faraday
and finish_faraday
to measure the times correctly.
def initialize(app, name = 'request.faraday')
super(app)
@name = name
@notifier = ActiveSupport::Notifications.notifier
@instrumenter_id = ActiveSupport::Notifications.instrumenter.id
end
def call(request_env)
context = request_env.request.context || {}
context[:request_id] = SecureRandom.hex(10)
request_env.request.context = context
@notifier.start_faraday(@name, @instrumenter_id, request_env)
@app.call(request_env).on_complete do |response_env|
@notifier.finish_faraday(@name, @instrumenter_id, response_env)
end
end
With these customizations, we can accurately measure the duration:
faraday.in_parallel(Typhoeus::Hydra.new(max_concurrency: 1)) do
9.times do |i|
faraday.get("/200?sleep=#{(i + 1) * 100}")
sleep(1)
end
end
# [httpstat.us] GET /200?sleep=100 (200) [1642159338 -> 1642159348 = 10.312]
# [httpstat.us] GET /200?sleep=200 (200) [1642159339 -> 1642159349 = 9.964]
# [httpstat.us] GET /200?sleep=300 (200) [1642159340 -> 1642159350 = 9.508]
# [httpstat.us] GET /200?sleep=400 (200) [1642159341 -> 1642159350 = 9.160]
# [httpstat.us] GET /200?sleep=500 (200) [1642159342 -> 1642159351 = 9.051]
# [httpstat.us] GET /200?sleep=600 (200) [1642159343 -> 1642159352 = 9.186]
# [httpstat.us] GET /200?sleep=700 (200) [1642159344 -> 1642159353 = 9.145]
# [httpstat.us] GET /200?sleep=800 (200) [1642159345 -> 1642159354 = 9.199]
# [httpstat.us] GET /200?sleep=900 (200) [1642159346 -> 1642159356 = 9.450]
@avmnu-sng first of all, thanks for re-raising this issue together with a detailed explanation and steps to reproduce, it helped me having a much better understanding as to what's is going on (which is hard, as I'm not really a parallel
user that much myself).
Your proposed solution looks similar to the one from @BuonOmo, although you both pointed out it sadly doesn't work exactly as expected.
I'm wondering at this point if the "fault" here might be on ActiveSupport::Notifications
and if there's a chance we could fix it there. For example, looking at the start
and finish
implementation, it doesn't look like they use the id
at all, something that would potentially all us to distinguish between requests:
def start(name, id, payload)
# id is provided, but not used to fetch the time, so this works well with
# nested measurements, but not for concurrent ones.
#
# A better implementation could be:
# timestack = Thread.current[:_timestack] ||= Hash.new { Array.new }
# timestack[key_from(name, id)].push Time.now
timestack = Thread.current[:_timestack] ||= []
timestack.push Time.now
end
def finish(name, id, payload)
# same here as for start
timestack = Thread.current[:_timestack]
started = timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
A better solution is to provide a way to access additional response information from the adapters that may contain timing and other information.
Should we not be able to find a way to better measure, then this would eventually be necessary, although it would require explicit implementation from each adapter via some sort of new internal API. That will take much longer to document and then we'd need to wait for each adapter to implement that as well, so I'd keep it as a last resort
@avmnu-sng thanks for the extra comment on how to fix ActiveSupport::Notifications
, that is exactly what I was aiming for, although your solution with the Concurrent::Map
is obviously better.
Changing the existing behavior entirely is not a good idea
Why do you think they wouldn't want to support that?
@iMacTia ActiveSupport::Notifications
is synchronized instrumentation that works at either a Thread
or Fiber
isolation. The id
that you see is the instrumenter
' id. What we need is a unique identifier for what is being instrumented not who is instrumenting it.
Basically, there is nothing wrong with the Rails
implementation. We are trying to use something designed for sync behavior which won't work by default.
If we want to use the existing start
and finish
APIs, we need to file a feature request to have something like:
# event_id is a unique identifier of the block getting executed
def start(event_name, event_id, payload); end
And therefore, the subscribers APIs need change:
# Use event_id to find the correct start time
def start(event_name, event_id, instrumenter_id, payload); end
ActiveSupport::Notifications
, I suggest, we inject additional APIs start_faraday
and finish_faraday
which use Concurrent::Map
and has the correct time data.For getting additional response information, we should look into two milestones:
save_response
to send additional information:
def save_response(env, status, body, headers = nil, reason_phrase = nil, informations = nil)
env.status = status
env.body = body
env.reason_phrase = reason_phrase && reason_phrase.to_s.strip
env.response_headers = Utils::Headers.new.tap do |response_headers|
response_headers.update headers unless headers.nil?
yield(response_headers) if block_given?
end
env.response_informations = informations
end
Typhoeus
could be updated to:
def request(env)
read_body env
req = typhoeus_request(env)
configure_ssl req, env configure_proxy req, env configure_timeout req, env configure_socket req, env
req.on_complete do |resp| if resp.timed_out? env[:typhoeus_timed_out] = true unless parallel?(env) raise Faraday::Error::TimeoutError, "request timed out" end elsif (resp.response_code == 0) || ((resp.return_code != :ok) && !resp.mock?) env[:typhoeus_connection_failed] = true env[:typhoeus_return_message] = resp.return_message unless parallel?(env) raise Faraday::Error::ConnectionFailed, resp.return_message end end
informations = [ :primary_ip, :request_size, :return_code, :return_message, :total_time, :name_lookup_time, :connect_time, :start_transfer_time, :app_connect_time, :pretransfer_time, :redirect_time, :redirect_count ].each_with_object({}) { |key, result| result[key] = resp.public_send(key) }
save_response(env, resp.code, resp.body, nil, nil, informations) do |response_headers| response_headers.parse resp.response_headers end
env[:response].finish(env) if parallel?(env) end
req end
I am running both these patches in my local environment.
ActiveSupport::Notifications.subscribe('request.faraday') do |_, start, finish, _, env|
puts format(
'[%<host>s] %<method>s %<request_uri>s (%<status>s) [%<start>i -> %<finish>i = %<duration>.3f] %<information>s',
host: env.url.host,
method: env.method.to_s.upcase,
request_uri: env.url.request_uri,
status: env.status,
start: start.to_i,
finish: finish.to_i,
duration: finish - start,
information: env.response_informations
)
end; nil
Sync request
faraday.get('200')
# [httpstat.us] GET /200 (200) [1642162573 -> 1642162574 = 1.011] {:primary_ip=>"20.40.202.3", :request_size=>82, :return_code=>:ok, :return_message=>"No error", :total_time=>1.009126, :name_lookup_time=>0.002421, :connect_time=>0.270699, :start_transfer_time=>1.008614, :app_connect_time=>0.768444, :pretransfer_time=>0.768517, :redirect_time=>0.0, :redirect_count=>0}
Async requests
faraday.in_parallel(Typhoeus::Hydra.new(max_concurrency: 1)) do
9.times do |i|
faraday.get("/200?sleep=#{(i + 1) * 100}")
sleep(1)
end
end
# [httpstat.us] GET /200?sleep=100 (200) [1642162721 -> 1642162732 = 10.502] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>1.45959, :name_lookup_time=>0.002013, :connect_time=>0.410836, :start_transfer_time=>1.459373, :app_connect_time=>0.907401, :pretransfer_time=>0.90748, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=200 (200) [1642162722 -> 1642162732 = 9.962] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.463959, :name_lookup_time=>3.9e-05, :connect_time=>3.9e-05, :start_transfer_time=>0.463728, :app_connect_time=>3.9e-05, :pretransfer_time=>7.6e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=300 (200) [1642162723 -> 1642162733 = 9.542] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.581223, :name_lookup_time=>3.6e-05, :connect_time=>3.6e-05, :start_transfer_time=>0.580916, :app_connect_time=>3.6e-05, :pretransfer_time=>8.5e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=400 (200) [1642162724 -> 1642162733 = 9.197] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.656018, :name_lookup_time=>4.3e-05, :connect_time=>4.3e-05, :start_transfer_time=>0.655717, :app_connect_time=>4.4e-05, :pretransfer_time=>8.8e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=500 (200) [1642162725 -> 1642162734 = 8.977] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.783188, :name_lookup_time=>3.7e-05, :connect_time=>3.8e-05, :start_transfer_time=>0.783018, :app_connect_time=>3.8e-05, :pretransfer_time=>8.6e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=600 (200) [1642162726 -> 1642162735 = 8.816] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.841132, :name_lookup_time=>5.5e-05, :connect_time=>5.5e-05, :start_transfer_time=>0.840889, :app_connect_time=>5.5e-05, :pretransfer_time=>9.5e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=700 (200) [1642162727 -> 1642162736 = 8.773] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>0.959293, :name_lookup_time=>6.0e-05, :connect_time=>6.1e-05, :start_transfer_time=>0.958201, :app_connect_time=>6.1e-05, :pretransfer_time=>0.000131, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=800 (200) [1642162728 -> 1642162737 = 8.811] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>1.04154, :name_lookup_time=>3.6e-05, :connect_time=>3.6e-05, :start_transfer_time=>1.041346, :app_connect_time=>3.6e-05, :pretransfer_time=>8.0e-05, :redirect_time=>0.0, :redirect_count=>0}
# [httpstat.us] GET /200?sleep=900 (200) [1642162729 -> 1642162738 = 8.954] {:primary_ip=>"20.40.202.3", :request_size=>92, :return_code=>:ok, :return_message=>"No error", :total_time=>1.144223, :name_lookup_time=>4.8e-05, :connect_time=>4.8e-05, :start_transfer_time=>1.143428, :app_connect_time=>4.8e-05, :pretransfer_time=>9.9e-05, :redirect_time=>0.0, :redirect_count=>0}
Basically, there is nothing wrong with the Rails implementation. We are trying to use something designed for sync behavior which won't work by default.
I think this is the key thing here. We use ActiveSupport::Notifications
as the default instrumenter
because it's the most common one and because it works in the majority of applications. But when you go for parallel requests, it obviously start showing its limits.
That said, the instrumentation
middleware does allow you to provide your own instrumenter, which at this point might be the right thing to do for those planning to run parallel requests.
Rather than monkey-patching an instrumenter that is not fit-for-purpose, should we instead provide an alternative one that is capable of supporting parallel requests? I did a quick search and couldn't find any obvious candidate, but I guess you might know one having worked more in a parallel context.
Another option we can look into is to ship this as a new middleware.
I originally planned to remove the instrumenter
middleware from Faraday as we did with retry
and multipart
, but for some reason it ended up staying. That doesn't mean we can't have a new Instrumentation middleware shipped as a separate gem. This would give us more implementation freedom and could be the obvious choice for those using a parallel-capable adapter.
For getting additional response information, we should look into two milestones:
- Update the adapter save_response to send additional information
- Start updating the adapters to send the additional information. For example, Typhoeus could be updated to
Thanks for explaining this in more details, I still believe this is not the right way forward, as it will require too many changes across too many repositories for this to work. Having a solution that just works, without having to rely on adapters, would be much more powerful.
I guess we are confusing the use-case when we use parallel.
Let me re-iterate:
connection.in_parallel
, it immediately returns after enqueuing the requests, but the instrumentation is correct because ActiveSupport::Notifications
measures the time to execute the block then sends the notification.request.on_complete
should be the trigger point for the notifications.start
and finish
APIs explicitly from ActiveSupport::Notifications
, these are not designed to be used explicitly. But note that there is nothing about parallel. But can explicitly instrument using start
and finish
or similar APIs.Regarding the additional response details, I say to use an adapter because these details are part of the response, and currently, Faraday uses the adapters to get the response.
Thanks for going through the details again @avmnu-sng, I agree with everything you said. I believe I should also attempt to articulate my thoughts a bit more clearly.
The Faraday instrumentation
middleware is supposed to fire instrumentation metric events so that you can instrument your requests. This can be done independently from the adapter you pick, in accordance with Faraday's spirit of abstracting the HTTP backend library.
Although the middleware allows you to pick the instrumenter you prefer, it currently defaults to ActiveSupport::Notifications
as this is the most used one and available out-of-the-box in Rails applications.
There are mainly 2 issues as you correctly highlighted already:
instrumentation
middleware is block-based, which doesn't work well with parallel requests. We can solve this by switching to the start/stop
interface instead.ActiveSupport::Notifications
have not been designed to work with parallel events, where the start
event is called from the same thread, since it relies on an events stack. So if we fix problem 1, we're still going to experience issues here.Assuming that we already solved problem 1 by fixing the middleware and switching to the start/stop
interface, there are a few solutions that we've discussed already:
ActiveSupport::Notifications
with ad-hoc faraday_start/stop
methods: this would eventually work, but at that point we're bringing the maintenance burden of that monkey-patch into Faraday, and we'd need to test it across different ActiveSupport::Notifications
versions to ensure it doesn't break. And that's simply because we chose it as the default instrumenter, so we're ignoring the fact that users can still change that, causing the middleware to behave unexpectedly again.As neither solutions above make me extremely happy, I proposed a couple of alternative solutions in my last comment which I'd be personally happy with:
instrumentation
middleware as a separate gem (can be called ParallelInstrumentation, or something similar). This could explicitly depend on ActiveSupport::Notifications
and apply the monkey-patch from solution 1, but would be maintained separately and possibly by someone else.ActiveSupport::Notifications
exists, an instrumenter built for concurrency that supports the start/stop
interface (we'll call this imaginary instrumenter ParallelInstrumenter
). One could instruct the existing middleware to use that instead with the following:
conn = Faraday.new(...) do |f|
f.request :instrumentation, instrumenter: ParallelInstrumenter
...
end
Now, since I don't personally work much with concurrent requests, I'm unaware of such an instrumenter, but I'd expect something like this to exist already and potentially being known by someone that works with concurrent ruby on a daily basis.
I hope the above is clearer and my reasoning makes sense. I do want to support this, but not at the expense of additional maintenance burden on the team, or entirely relying on the adapters.
Do any of the alternative solutions make sense and look like a viable option? Are you aware of any alternative instrumenter that supports parallel instrumentation?
@iMacTia I agree with you on the instrumentation part that it makes more sense to have an alternative to ActiveSupport::Notifications
or monkey-patch it. However, I think we have a different perspective on the additional response data. I am not considering this as instrumentation information.
Assuming we have ParallelInstrumenter
or I would rather call it InstrumenterX
for now. The time taken here is the moment the request is queued to when the response is received. It is different in the case of a sync request, where we measure the time since the request is executed to the response is received. It is hard to achieve the same with connection.in_parallel
because can't actually know when the request starts executing, but definitely get the time when the request finishes. This is where the additional response data would be handy and you get fine-grained data, but it is on top of the changes to instrumentation we are talking about, it is a must.
Another option is if we have request.on_start
then we actually know when a request starts executing. It again requires changes on the adapters level.
I'm finally seeing what you mean, and apologies if it took me so long.
I was under the wrong assumption that parallel requests would start immediately, but in your example you made it pretty clear that if you queue N requests and the max_concurreny
is lower than N, then some time will lapse between the call to the instrumenter start
and the parallel request actually being started.
With that in mind I can't but agree that this is a service the adapter should be exposing, as that's the only thing that can know when a request actually starts.
A way to get around this could be that the middleware checks for the adapter support of this "feature" and falls back to the inaccurate behaviour of start/stop
if that's not the case.
I'll need to think about this more though, as middleware is not supposed to have a direct communication with the adapter, so we'll need some sort of middle abstraction layer to get around that.
I'm also starting to think if this should not be managed by the underlying library entirely at this point. Why use the faraday middleware and instead not request that Typhoeus or other libraries expose some sort of notifications interface one can register to? Do they do this already by any chance?
@iMacTia sorry for the late reply, also I have to admit I use more Typhoeus on its own nowadays due to Faraday's lack in parallelism support and my lack of time to fix that...
Anyway, I've quickly read your conversation, hopefully it will lead to something great! Just here to say: Typhoeus does expose instrumentation (https://github.com/typhoeus/ethon/blob/master/lib/ethon/easy/informations.rb#L18), and some kind of notification (e.g: on_complete
)
Thanks @BuonOmo, I believe that's the same informations
@avmnu-sng mentioned in their comment earlier, and it sure looks like a lot of information! The only missing thing there is the integration with a notification system like ActiveSupport::Notifications
, but that's something the middleware could provide.
I spent some more time thinking about this, and I believe the following step-by-step approach should work. It's mostly based on things that @avmnu-sng already raised, but my brain needed a little extra help to put them together, so I hope this list will help others see them clearly as well:
instrumentation
middleware to use the start/stop
interface. This could "replace" the existing block behaviour or, to be backwards-compatible, enrich it after a) checking if the instrumenter supports it; or b) check if we're running in_parallel
. This will make things better, but will also introduce some inconsistencies as explained in the ticket description, which we can document and point outinformations
😄, but it will basically contain that info. Maybe instrumentation
or something like that.instrumentation
middleware once again so that, in the on_complete
callback, it checks for this new field presence, and if found it uses that to enrich the notification payload, then it fires it.@BuonOmo @avmnu-sng what do you think about the plan above? Any feedback would be appreciated. Also, @BuonOmo, I appreciate the parallelism support have not been our main concern, mostly due to the Faraday core team inexperience on the topic, but I believe it could become a big focus now that v2.0 is out and we plan our road to v3.0. It would be really useful to have a comprehensive list of current issues so that we can explore and tackle them as necessary. May I suggest we start a new Discussion on the topic so we can coordinate there? How does that sound?
@iMacTia I am definitely in agreement 😄. Feel free to start a new discussion and let's plan and execute it.
Reference: https://github.com/lostisland/faraday-retry/issues/2 Adapter: Typhoeus
Problem Statement
The current implementation is:
Let's define the connection and subscribe to the default event:
Consider the following sync request that works perfectly fine:
Now, let's make async (
in_parallel
) requests:Notice that there is no response because these requests are added to the
Hydra
queue only and will be performed later.Flawed Solution
Since the notification is published before the response is completed, we should try to delay the notification until there is a response, so we can look at an updated implementation:
Let's make the requests:
We have fixed the response issue, seems like there is a problem with the duration, well not exactly, because technically, we are now measuring the total time to finish the nth request since we submitted the requests.
There is a bigger problem, consider the following requests:
Note that the start times are in reverse order because
ActiveSupport::Notifications::Fanout
maintains a time stack.Conclusion
We cannot easily figure out the exact start time of the requests and building a system across all the adapters to get the start times would be a tedious and error-prone process. A better solution is to provide a way to access additional response information from the adapters that may contain timing and other information.