Open benwilson512 opened 9 years ago
Thanks for copying me @benwilson512.
It depends: GenServers are also not guaranteed to respond in a timely fashion. If you are doing a computation that takes 30 seconds in a GenServer, it will be irresponsive. The difference here is that it is possible to alleviate this with a GenServer but not so much when it is stream based.
That's exactly when you can be in trouble. If you are doing a "receive" inside the Stream then it is not a good mapping. It is exactly the case I mentioned above: it is easy to solve in a GenServer by relying on handle_info
but there isn't anything we can do with Streams.
The good news is: it is dead easy to implement a GenRouter.In
for it, specially if it is pull. It is literally this:
defmodule GenRouter.AmazonSQSIn do
def init(_) do
{:ok, %{}}
end
def handle_demand(demand, state) when is_integer(current) do
request_n_items_from_sqs(demand)
{:noreply, state}
end
def handle_info({:"$sqs_event", events}, state) do
{:dispatch, events, state}
end
def handle_info(_, state) do
{:noreply, state}
end
end
Where the request_n_items_from_sqs/1
needs to be implemented regardless of your solution. You can easily do buffering too. It is going to give you a better control over the system back pressure, be more compliant to OTP and what not.
We put it behind a well behaved proxying process. The protocol of communication could even follow ordinary GenRouter conventions, but would be limited to $gen_ask and even then only when the previously requested quantity had been delivered.
I am not sure how much this will help us. There are two kinds of messages the GenRouter will receive:
From the system messages perspective, we are not gaining anything. Because although the front-end is responsive, the process doing the hard work is still as irresponsive as before. We can inspect information but it is not the information we want.
The proxying does help with subscriptions but at what cost?
In particular, all events will now have two extra hops because of the proxy. We may be able to make it one hop if we are able to implement a proxying feature in GenRouter but it is unlikely we can completely get rid of them.
Those are my $.02 though. Let's see what @fishcakez has to say. :)
I think @josevalim gets it spot on really. It is not possible for us to make a blocking call nonblocking. To get the most out of OTP we need to provide a callback based version. Callbacks allow us to receive outside the specific implementation, which aids responsiveness to system messages, subscriptions etc. It also means we can do code reloading well. In StreamRunner I'm unsure how to handle the reloading of the continuation fun. Perhaps the same as an Agent.
We can try to provide a GenRouter.StreamIn implementation that works in a similar way to StreamRunner but only calls the continuation when there is demand. That would allow using a Stream as a source and is a logical next step in terms of streams. That faces the same issue as StreamRunner for code changes though.
I feel that is as far as we can taken the streams as processes idea. I am unsure if we could get either good enough for inclusion in elixir itself. Using an OTP style behaviour is always going to have better features.
If I am honest StreamRunner is "done" from my perspective, I'm not sure how we can do anything better for running a stream as a process purely for its side effects.
I think though we need to consider there are two different stream runners.
StreamRunner.start_link(stream)
. Because enumerables can also model infinity, we will break this into infinite and finite enumerables down below.StreamRunner.start_link(& &1 |> Stream.map |> Stream.that)
The difference here is that it is unlikely you will do waiting computations in 2 because it is all mapping, filtering, taking, flat mapping and so on. I mean, you could still do waiting computations, as you would in a GenServer, but it is less likely you will shoot your own foot. Sources are the most likely to do waiting and they are not allowed here.
That's in fact the direction I am more inclined to go at this moment. You can use streams to model transformations that would happen in a GenRouter, however, we won't use streams as GenRouter sources because we can't really make it work. If you have a enumerable source that you want to be part of streaming, you have two options:
source |> Stream.each(&GenRouter.notify(router, &1)) |> Stream.run
from the process that has the dataThis means we also won't have Stream.merge and friends, those things belong to the router and need to be tackled explicitly. If there is asynchrony, it needs to belong to a process.
Ah yes. I was only talking about type 1.
On Tuesday, 29 September 2015, José Valim notifications@github.com wrote:
I think though we need to consider there are two different stream runners.
1.
A stream runner where the source is an enumerable. It would be started like StreamRunner.start_link(stream). Because enumerables can also model infinity, we will break this into infinite and finite enumerables down below. 2.
A stream runner which transforms data reaching the current process and sends it to other processes. So the source is kind of the router inbox. It would be started like StreamRunner.start_link(& &1 |> Stream.map |> Stream.that)
The difference here is that it is unlikely you will do waiting computations in 2 because it is all mapping, filtering, taking, flat mapping and so on. I mean, you could still do waiting computations, as you would in a GenServer, but it is less likely you will shoot your own foot. Sources are the most likely to do waiting and they are not allowed here.
That's in fact the direction I am more inclined to go at this moment. You can use streams to model transformations that would happen in a GenRouter, however, we won't use streams as GenRouter sources because we can't really make it work. If you have a enumerable source that you want to be part of streaming, you have two options:
1.
If it is a finite source, you can run source |> Stream.each(&GenRouter.notify(router, &1)) |> Stream.run from the process that has the data 2.
If it is an infinite source, it must be a custom GenRouter per my previous comment
This means we also won't have Stream.merge and friends, those things belong to the router and need to be tackled explicitly. If there is asynchrony, it needs to belong to a process.
— Reply to this email directly or view it on GitHub https://github.com/fishcakez/stream_runner/issues/2#issuecomment-144016182 .
I've been going over the GenRouter docs and the stream runner code, and I have concerns about the ability of StreamRunner or really any process who's job it is to call
{:cont, nil}
on a stream to fit in the GenRouter ecosystem on its own. The crux of the matter lies in the fact that the length of time any given callcont.({:cont, nil})
takes is practically unbounded. While the process is blocked waiting on the next item, it obviously can't handle any of the callbacks enumerated in the docs concerning GenRouter.In the ordinary case the inability to respond in a timely fashion to messages ought to result in
exit
calls just like GenRouter's other Gen brethren, but in this case there's an irreducible component of delay. We can either try to simply inform users of StreamRunner of this idiosyncrasy, OR...We put it behind a well behaved proxying process. The protocol of communication could even follow ordinary GenRouter conventions, but would be limited to
$gen_ask
and even then only when the previously requested quantity had been delivered. The proxying process itself would act exactly like any other GenRouter member, meaning no surprises for users who include it alongside other processes. The StreamRunner can take as long as it likes to return fromcont.()
, the GenRouter interface would remain responsive.The public interface of the StreamRunner module (or whatever it becomes known as) would be modified such that it always starts both the underlying process that enumerates the stream, and the proxying process.
Thoughts?
@josevalim this seems relevant to you as well.