Closed almightycouch closed 3 years ago
Nevermind, the function is available in :hackey
:
:hackney.stop_async id
I would love to support this as part oh HTTPoison!
Hi @edgurgel.
I had some struggle using the Twitter Streaming API with HTTPoison.
I think the :stream_to
mechanism is great when fetching a limited amount of data (the way you transform incoming chunks into specific status, header, chunk and end packages feels really nice).
But it's kind of tricky when the source keeps sending data over the HTTP connection forever. I think for this sort of data flow, you may want to end up using Elixir streams.
In my project, the API looks like that:
iex(1)> {:ok, stream} = Twittex.Client.stream "cop21"
%GenEvent.Stream{manager: #PID<0.169.0>, timeout: :infinity}
iex(2)> Enum.take(stream, 5)
[%{}, %{}, %{}, %{}, %{}]
iex(2)> GenEvent.stop stream.manager
:ok
The last line stops the GenEvent
which stops the :hackney
async connection in it terminate/2
callback.
I'm sure some of this is pretty generic and could be added to HTTPoison to solve that kind of problems. I had some issues to get it done. But in the end, it seems to work as intended.
1) No support for changing the chunk size
Twitter supports a delimiter
parameter:
Setting this to the string length indicates that statuses should be delimited in the stream, so that clients know how many bytes to read before the end of the status message. Statuses are represented by a length, in bytes, a newline, and the status text that is exactly length bytes. Note that “keep-alive” newlines may be inserted before each length.
https://dev.twitter.com/streaming/overview/request-parameters#delimited
With HTTPoison, you have to handle this by collecting multiple chunks to fetch the entire status.
That's ok, though it has to be done in a kind of loop over a receive
block (not feeling so good).
In my example, the loop is implemented as follow:
defp stream_loop(listener, buffer \\ "", size \\ 0) do
receive do
%HTTPoison.AsyncChunk{chunk: chunk} ->
chunk_size = String.length(chunk)
cond do
size == 0 ->
[size, chunk] = String.split(chunk, "\r\n", parts: 2)
stream_loop(listener, chunk, String.to_integer(size) - String.length(chunk) - 1)
size > chunk_size ->
stream_loop(listener, buffer <> chunk, size - chunk_size)
size < chunk_size ->
raise "Oops, reading ahead of chunk"
size == chunk_size ->
GenEvent.ack_notify(listener, Poison.decode!(buffer <> chunk))
end
_ -> :noop
end
stream_loop(listener)
end
As you can see, i raise an exception if the chunk is ahead of the status length. Luckily this never happens (i think the Twitter Streaming endpoint has a little delay between two statuses).
I tried to change the chunk size by forwarding a bigger req_chunk_size
param to :hackney
.
But it didn't change anything. Incoming chunks were always smaller than length of statuses.
Maybe #97 would be a better fit for this kind of problems.
2) Forwarding incoming messages to another process doesn't feel great
:hackney
sends incoming data to your transformer
function. Those are transformed and send to the given :stream_to
process. Using a receive
block may not be the best solution for that kind of work.
Elixir provides a lot of functionalities for this kind of data flow (GenEvent
, Stream
, etc.).
In my example, i ended with two different processes for processing the data:
stream_loop
to forward full statuses to a GenEvent
.GenEvent
process which is used to build a Stream
of collected statuses (GenEvent.stream/2
).This means that at least four processes (:hackney.recv_async/1
, transformer/1
and the two listed above) are used to create a stream from the incoming data.
I'm sorry for the super late response. The number 1 is a hackney issue as the chunk size is set just once.
The other one I would like to help users providing some module/functions to help stream data. Right now we break the request into multiple parts and sometimes the user just want the body to be sent asynchronously or even the just a non-blocking request/response. Maybe adding some options to the way we send the async response would help users. Then using Stream would be easier as you won't have multiple parts to deal with for example.
What do you think?
+1
I would just like to say that, with GenStage out, you can implement the streaming feature as a GenStage producer. In particular would be interesting to have a chunk streamer so you get every different chunk as an event. The downside is that GenStage producer messages are always wrapped in the {:"$gen_consumer, _, [event]}
format, so it is not exactly backwards compatible with the current stream_to
mechanism. However, if you implement it as a GenStage producer, you will be able to consume it either from another process or as a stream (by calling GenStage.stream).
The first step has been taken: https://github.com/edgurgel/httpoison/pull/189
Now we need to think how we are going to handle pause/resume and close requests. One issue that will arise is the facet that currently we use a standalone process to convert hackney responses to HTTPoison structs. We need to somehow handle this better as it's currently started with no link/monitor and it won't die until it receives {:hackney_response, id, :done}
.
I've tackled some on these issues in my Twitter implementation (see here).
Basically, when starting an async (long pooling, etc.) request, I start my GenStage
first (linked to the caller). Giving it's PID to :hackney
so it receives :hackney_response
message as soon as possible:
{:ok, stage} = Stream.start_link()
options = Keyword.merge(options, hackney: [stream_to: stage, async: :once], recv_timeout: :infinity)
# make hackney request with above options
Currently, you are using the :transform
option to convert :hackney_response
to %HTTPoison{}
, maybe this is something that should be done by the GenStage
as well.
I think the API should be simple as:
{:ok, stage_pid} = HTTPoison.get "http://example.org/stream", %{}, async: :stage
stream = GenStage.stream([stage_pid])
Stopping the stream can be done automatically when the GenStage
stops:
def terminate(_reason, state) do
:hackney.stop_async(state.ref)
end
Pause/resume could be handled in the same manner:
def handle_call(:pause, state) do
{:reply, [], :hackney.pause_stream(state.ref)}
end
def handle_call(:resume, state) do
{:reply, [], :hackney.resume_stream(state.ref)}
end
Closing due to inactivity.
I'm using the
:stream_to
parameter to stream data from a HTTP endpoint. The endpoint streams forever, which means that i cannot wait for an%HTTPoison.AsyncEnd{}
message.Is there a way to close the connection? Maybe using the
%HTTPoison.AsyncResponse
id?