Open ndrean opened 1 year ago
An example of numerous HTTP requests. I had to find all the (1680) NPM packages published by AWS with their download count.
We first query the npms-api search paginated endpoint. With this list, we query the NPM registry to get statistics for each package. The page chunk is 25, so you need to paginate the requests, and send a total of approx 1750 = 1680 + 1680/25 requests. The longer one is the paginated API with a response time of approx 200-250ms per query. Then we send 1680 queries to get the stats on each package with an average response of approx 10-15ms. We get the result on average in around 20s, instead of 1680 / 25 0.200 = 13s + 1700 0,010=17s = 30s. This shows that queries have been run in parallel.
The code below runs in streams (since the endpoint is paginated) and produces a JSON file with the results.
Used Stream.resource to build the list of packages. Used the total count returned by the endpoint to handle the pagination and increment a counter on each iteration.
Use async_stream to query the second endpoint. It returns statistics on a given package. We retrieve the downloaded count during a given period.
Then Enum.sort_by to order the list of maps on a given key.
With the result, we run a side-effect as a Task
: it saves the data in a file. Since we need the data unchanged for the final step, use the function tap and the data passes by . This allows us to keep the flow and run this side -effect.
We eventually prettify the data into a list of maps %{"package_name" => downloaded_count}
.
Elixir
can nicely chain all these streams. Open a Livebook, Ctrl-C/V and test!
Mix.install([
{:finch, "~> 0.16.0"},
{:jsonrs, "~> 0.3.1"}
])
Supervisor.start_link(
[
{Finch, name: MyFinch},
{Task.Supervisor, name: MyTaskSup}
],
strategy: :one_for_one,
name: MySup
)
defmodule Npm do
require Logger
@registry "https://api.npmjs.org"
@search_point "https://api.npms.io/v2/search"
@starting "2022-01-01"
@ending "2023-01-01"
@search "@aws-sdk/client"
def downloaded(packagename, start, ending) do
path =
@registry <>
"/downloads/point/" <> "#{start}" <> ":" <> "#{ending}" <> "/" <> "#{packagename}"
with {:ok, %{body: result}} <-
Finch.build(:get, path) |> Finch.request(MyFinch),
{:ok, response} <- Jsonrs.decode(to_string(result)) do
response
else
{:error, reason} ->
{:error, reason}
end
end
def find(save? \\ false, string \\ @search, starting \\ @starting, ending \\ @ending) do
check_response = fn
{:ok, response} -> response
{:error, reason} -> reason
end
save_to_file = fn list ->
Task.Supervisor.async_nolink(MyTaskSup, fn ->
case Jsonrs.encode(list, lean: true, pretty: true) do
{:ok, result} -> File.write!("../aws-npm-packages.json", result)
{:error, reason} -> {:error, reason}
end
end)
end
next = fn {data, page} ->
{response, total} = search(string, 25 * page)
case page * 25 >= total do
true ->
{:halt, data}
false ->
{response, {data, page + 1}}
end
end
cross_data = fn
nil ->
{:error, :empty}
%{"downloads" => d, "package" => name} ->
Map.put(%{}, name, d)
end
try do
{:ok,
Stream.resource(
fn -> {[], 0} end,
&next.(&1),
fn _ -> nil end
)
|> Task.async_stream(&downloaded(&1, starting, ending), timeout: 10_000)
|> Stream.map(&check_response.(&1))
|> Enum.sort_by(&Map.get(&1, "downloads"), :desc)
|> tap(fn data -> if save?, do: save_to_file.(data) end)
|> Enum.map(&cross_data.(&1))}
rescue
e ->
Logger.warn(e)
{:error, :emtpy}
# Process.sleep(1_000)
# find(string, starting, ending)
end
end
def search(string, from \\ 0) do
url =
@search_point
|> URI.new!()
|> URI.append_query(URI.encode_query(%{q: string, size: 25, from: from}))
|> URI.to_string()
with {:ok, %{body: body}} <-
Finch.build(:get, url)
|> Finch.request(MyFinch),
{:ok, %{"results" => results, "total" => total}} <- Jsonrs.decode(body) do
{
Stream.filter(results, fn package ->
Map.has_key?(package, "flags") === false &&
get_in(package, ["package", "name"]) |> String.contains?(string)
end)
|> Stream.map(&get_in(&1, ["package", "name"])),
total
}
else
{:error, reason} ->
{:error, reason}
end
end
end
Npm.find(true)
Thanks for the awesome insight @ndrean ! Mighty interesting 🤔 I'm curious to give your piece of code a go to see these concurrent requests in action. As soon as I have the time, I'll give it a whirl!
Does anyone has experience with "connection pools"? I imagine it limits the overhead of the HTTPS instanctation protocole but I am rather unsure of this.
Finch: https://hexdocs.pm/finch/Finch.html#start_link/1-pool-configuration-options Nimble_pool: https://github.com/dashbitco/nimble_pool
I tried to add the following and it seems to have improved the search (below 10s).
{Finch, name: MyFinch,
pools: %{
"https://api.npmjs.org" => [size: 50],
"https://api.npms.io/v2/search" => [size: 50]
}
},
Am I under-estimating this question by saying "just use Task.async_nolink"?
Using this inside a GenServer and you can make this easily recurrent.