hamiltop / streamz

Elixir Streams and Utilities for Streaming.
24 stars 4 forks source link

Update Stream.merge to use the same message format as GenEvent.Stream #20

Open hamiltop opened 10 years ago

hamiltop commented 10 years ago

We have defined a message format for GenEvent.Stream. https://github.com/elixir-lang/elixir/blob/ab9450d92ff1582ad9ce11e6ec4baf450a07e00e/lib/elixir/lib/gen_event/stream.ex#L107

If merge were to conform to that format then Stream.merge can easily accept messages directly from GenEvent.Stream.

josevalim commented 10 years ago

I believe this is done? :D

Anyway, I just want to let you know that we will no longer change the gen event message, at least not for 1.0. We will continue sending :gen_event_EXIT unless think of a good reason not to.

I have only one issue in mind but I believe it can be solved later on. Today, if you use GenEvent.add_handler/4, the :gen_event_EXIT message is only sent if you pass monitor: true. We could mimic the API in GenEvent.connect/3, the issue is that connect/3 always monitors by default, so the name monitor can be misleading. We have some options:

  1. Use a different option name in connect/3 (like :notify_on_exit) - it feels weird though because that option exists only for connect/3
  2. Change add_handler/4 to have two options (:monitor and :notify_on_exit) and connect/3 would support just the latter (as monitor is done by default) - it can be confusing though because most of the time for add_handler/4 you want monitor+notify_on_exit to be true at the same time.

Thoughts?

hamiltop commented 10 years ago

What is the cost of monitoring? In what circumstances would we not want to monitor?

That's still an area that's a bit foreign to me (not coming from an erlang background).

Maybe Connectable should require monitoring. Linking would still be handled by the caller.

josevalim commented 10 years ago

For add_handler, we monitor when we want the event manager to monitor the process that added the handler. If the adding process dies, the handler is removed. It also has the side effect of sending notifications to the adding process whenever the handler is removed.

For GenEvent.connect/3 though, we must always monitor the handler because in this case the handler is the process. So the only option we have to control is if we should notify on exits or not.

josevalim commented 10 years ago

Ok, after this discussion, I have done some final changes on how GenEvent works. I believe those are the relevant commits:

I think it is actually nice we no longer need to match on GenEvent.Stream everywhere (it becomes an implementation detail at this point that should not leak anywhere).

hamiltop commented 10 years ago

Cool I'll clean up my branch here and see how things look. I still want to try to get same documentation for Connectable written, because there are a lot of assumptions and expectations that aren't clear.

Curious, what is the post 1.0.0 roadmap for elixir? Right now its easy to add new code because it's pre 1.0, but how open to new stuff (for example, adding Connectable) will elixir be post 1.0?

I don't think merge is ripe yet (error handling is still not well defined) and probably still belongs in a separate package. I'm just wondering what the long term path looks like here. On Sep 5, 2014 4:37 AM, "José Valim" notifications@github.com wrote:

Ok, after this discussion, I have done some final changes on how GenEvent works. I believe those are the relevant commits:

I think it is actually nice we no longer need to match on GenEvent.Stream everywhere (it becomes an implementation detail at this point that should not leak anywhere).

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-54614503.

josevalim commented 10 years ago

The goal is to build all new ideas as separate projects and then assess its importance to the community. This will also be helpful to get things right because we won't have much opportunity for code churning once they get merged into Elixir.

So, to be more explicit, we should move Streamz from experimental stage to usable, i.e. tidy up the abstractions, docs, documented APIs, etc. Then we promote it, see how involved the community gets, and then merge it in. For example, once we have built enough abstractions around Connectable as a protocol we can move it to Elixir with things like Stream.merge while we continue evaluating other aspects.

The next stage would be to bring in the parallel stuff and so on and on. Does it make sense?

hamiltop commented 10 years ago

Yep. Just the clarification I was looking for.

On Fri, Sep 5, 2014 at 8:30 AM, José Valim notifications@github.com wrote:

The goal is to build all new ideas as separate projects and then assess its importance to the community. This will also be helpful to get things right because we won't have much opportunity for code churning once they get merged into Elixir.

So, to be more explicit, we should move Streamz from experimental stage to usable, i.e. tidy up the abstractions, docs, documented APIs, etc. Then we promote it, see how involved the community gets, and then merge it in. For example, once we have built enough abstractions around Connectable as a protocol we can move it to Elixir with things like Stream.merge while we continue evaluating other aspects.

The next stage would be to bring in the parallel stuff and so on and on. Does it make sense?

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-54640480.

hamiltop commented 10 years ago

I approached the protocol from a new angle. I ignored merge and simply looked at getting a protocol put together that worked well with GenEvent.

It's all under https://github.com/hamiltop/streamz/tree/event_source_protocol

Some notable design decisions:

These were just sort of convenient solutions to problems I hadn't thought about before. There are probably other ways to go about it.

There are some simple tests to make sure it works with GenEvent.Stream as well as normal Enumerables.

I've also put together https://github.com/hamiltop/systemex which uses EventSource in a few different places. Notable uses are:

The idea with WebsocketHandler is that I can connect N EventSources to it in my application configuration. The follow-on will be to treat it like an EventSource and connect it to M consumers.

In Systemex.Mem I'm connecting a stream to a GenEvent. It feels a little clunky, because it would be easier to just Enum.each &GenEvent.ack_notify(manager, &1) but the benefit is that I could change the implementation of the manager without breaking the code (it could move away from a GenEvent to something else).

Also, I'm using rx.js on the Javascript side, just to keep things consistent.

@josevalim Let me know if you think I'm heading down the wrong path. I've ignored gen_eventExit thus far as these are all infinite streams, but otherwise I feel like my design is the result of some real world usage.

josevalim commented 10 years ago

I think this looks great. I think returning some sort of struct is likely going to be very helpful in the long term too. Quick question though: why are we returning a list on add? Is there a chance none or more than one stream being added? I think I just thought of a scenario where this could happen: if you have a Stream.merge/1 and you call event source on it, we can skip the merge process altogether and send the messages directly to the process given to the event source, is this what you have in mind?

hamiltop commented 10 years ago

Yep. I did this on my first iteration with Connectable

https://github.com/hamiltop/streamz/blob/connectable_protocol/lib/streamz/connectable.ex#L55

I haven't gotten to Merge yet with the new code, but it will be similar.

That's the "many" case. I'm not sure of there is an empty list scenario.

josevalim commented 10 years ago

I am writing a document that is meant to consider all use cases we have discussed so far. For example, we want to have event pools but avoid copying the data from sender to pool master to pool slave. We want to have pipelines but be able to track events through the pipeline. At the end of the document, I have done so many extensions to the "EventSource message protocol" that I had to basically reimplement GenEvent in the implementation of EventSource.Any.

I am wondering if it is indeed easier to just say: EventSource is about a protocol that starts a GenEvent (if one is not running yet) with custom behaviour. For example, if we support sticky handlers, which is basically handlers that cause GenEvent termination once removed, we can implement merge with a GenEvent as follows:

def merge(left, right) do
  fn acc, fun ->
    {:ok, manager} = GenEvent.start_link()

    # Ask the GenEvent to send events to the
    # current process. If the current process
    # exits for any reason (including normal),
    # the GenEvent aborts due to sticky.
    GenEvent.add_process_handler(manager, self(), sticky: true)

    # Register a merger handler that will call
    # EventSource.add/2. See the module below.
    GenEvent.add_handler(manager, GenEvent.MergeHandler,
                         [left, right], sticky: true)

    # Consume events as in GenEvent.Stream with acc, fun
  end
end

defmodule GenEvent.MergeHandler do
  use GenEvent

  def init(sources) do
    state = Enum.map sources, fn source ->
      {:ok, pid, ref} = EventSource.add(source, self())
      # A crash in any source brings the whole thing down
      Process.link(pid)
      {pid, ref}
    end

    {:ok, state}
  end

  def handle_info({:gen_event_EXIT, id, reason}, state) do
    # Remove id from state until state is empty.
    # When the state is empty, return :remove_handler
    # which will cause the whole GenEvent to crash
    # as this handler is sticky.
  end
end

I think it should be able to implement EventSource.Any with a GenEvent with few extensions to GenEvent too. This feels it will make our lives much, much, much easier.

Thoughts?

hamiltop commented 10 years ago

So that implementation won't flatten nested Merges. It probably could be tweaked to do so easily (use a struct rather than a fun), so not a bit deal.

As is, it can't really be extended to work with an infinite stream of streams because the MergeHandler will enumerate forever. This could be fixed by spawning a task and storing the state via an agent (it's what I did on my latest merge implementation).

As a side note, I found that in practical usage I dislike calling add_process_handler explicitly. Calling EventSource.add seems cleaner, even if it's being called on a GenEvent.

I don't see any other major gaps. I think it will work.

On Mon, Sep 15, 2014 at 10:59 AM, José Valim notifications@github.com wrote:

I am writing a document that is meant to consider all use cases we have discussed so far. For example, we want to have event pools but avoid copying the data from sender to pool master to pool slave. We want to have pipelines but be able to track events through the pipeline. At the end of the document, I have done so many extensions to the "EventSource message protocol" that I had to basically reimplement GenEvent in the implementation of EventSource.Any.

I am wondering if it is indeed easier to just say: EventSource is about a protocol that starts a GenEvent (if one is not running yet) with custom behaviour. For example, if we support sticky handlers, which is basically handlers that cause GenEvent termination once removed, we can implement merge with a GenEvent as follows:

def merge(left, right) do fn acc, fun -> {:ok, manager} = GenEvent.start_link()

# Ask the GenEvent to send events to the
# current process. If the current process
# exits for any reason (including normal),
# the GenEvent aborts due to sticky.
GenEvent.add_process_handler(manager, self(), sticky: true)

# Register a merger handler that will call
# EventSource.add/2. See the module below.
GenEvent.add_handler(manager, GenEvent.MergeHandler,
                     [left, right], sticky: true)

# Consume events as in GenEvent.Stream with acc, fun

endend defmodule GenEvent.MergeHandler do use GenEvent

def init(sources) do state = Enum.map sources, fn source -> {:ok, pid, ref} = EventSource.add(source, self())

A crash in any source brings the whole thing down

  Process.link(pid)
  {pid, ref}
end
{:ok, state}

end

def handle_info({:gen_event_EXIT, id, reason}, state) do

Remove id from state until state is empty.

# When the state is empty, return :remove_handler
# which will cause the whole GenEvent to crash
# as this handler is sticky.

endend

I think it should be able to implement EventSource.Any with a GenEvent with few extensions to GenEvent too. This feels it will make our lives much, much, much easier.

Thoughts?

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-55630774.

josevalim commented 10 years ago

So that implementation won't flatten nested Merges. It probably could be tweaked to do so easily (use a struct rather than a fun), so not a bit deal.

Right!

As is, it can't really be extended to work with an infinite stream of streams because the MergeHandler will enumerate forever.

Oh wow, I haven't even thought about this case before. Honestly, I would be ok with saying that a list is required. Do you have an use case for using infinity in merge?

As a side note, I found that in practical usage I dislike calling add_process_handler explicitly. Calling EventSource.add seems cleaner, even if it's being called on a GenEvent.

Yes I am on the same boat. The issue though is that EventSource.add works on data structures and the add_process_handler works on pid/atom. So we would need to wrap it on a stream for this particular use case (which is fine). I thought about implementing EventSource for PID/atom/tuple but that is probably more trouble than it is really worthy (we should on all likelihood keep the data structures that implement EventSource a subset of Enumerable).

Thank you for all those conversations! :heart:

hamiltop commented 10 years ago

On Mon, Sep 15, 2014 at 11:41 AM, José Valim notifications@github.com wrote:

So that implementation won't flatten nested Merges. It probably could be tweaked to do so easily (use a struct rather than a fun), so not a bit deal.

Right!

As is, it can't really be extended to work with an infinite stream of streams because the MergeHandler will enumerate forever.

Oh wow, I haven't even thought about this case before. Honestly, I would be ok with saying that a list is required. Do you have an use case for using infinity in merge?

An unbounded pmap is pretty simple with infinite streams of streams https://github.com/hamiltop/streamz/blob/master/lib/streamz.ex#L131

A TCPServer that produces streamable TCPSockets is another case (An interesting case would be a graphite server. You could process each data point in parallel and then merge all the streams into a single stream for persistence.)

RxJava has a ton of stream of streams stuff. I think it's worth supporting. We could always optimize for a finite list but still support any Enumerable.

As a side note, I found that in practical usage I dislike calling add_process_handler explicitly. Calling EventSource.add seems cleaner, even if it's being called on a GenEvent.

Yes I am on the same boat. The issue though is that EventSource.add works on data structures and the add_process_handler works on pid/atom. So we would need to wrap it on a stream for this particular use case (which is fine). I thought about implementing EventSource for PID/atom/tuple but that is probably more trouble than it is really worthy (we should on all likelihood keep the data structures that implement EventSource a subset of Enumerable).

Thank you for all those conversations! [image: :heart:]

Looking at your code again, I'm getting more skeptical (always a good thing). Somehow I missed this in my first glance, but would this spawn N+1 GenEvents? My perf tests have shown that the +1 process is much much slower.

I guess that raises the question of usage. If merge is the only thing happening on a process, replacing it with a GenEvent is fine. Otherwise, a merge will block until either 1) all substreams finish 2) enumeration finishes (Enum.take(10) for example). Is there a way to setup and take down a GenEvent on the current process?

If not, function handlers on gen_event could mitigate the issues. Just don't ever merge to the current process.

Though that smells a bit like monadic callback based Scala, which is a little troubling.

I'll think about this a bit more.

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-55636979.

josevalim commented 10 years ago

Looking at your code again, I'm getting more skeptical (always a good thing). Somehow I missed this in my first glance, but would this spawn N+1 GenEvents? My perf tests have shown that the +1 process is much much slower.

You are right, it is going to be N+1, but there is no reason. So this example is pretty much moot. The trouble is in defining sources, a sink is much easier to hand-roll.

Let me try to do something that shows an Enumerable as source. Do you have other sources you would like me to try as an example?

hamiltop commented 10 years ago

I think Enumerable is really the only Any supported. Everything else would have a specific implementation. On Sep 15, 2014 12:27 PM, "José Valim" notifications@github.com wrote:

Looking at your code again, I'm getting more skeptical (always a good thing). Somehow I missed this in my first glance, but would this spawn N+1 GenEvents? My perf tests have shown that the +1 process is much much slower.

You are right, it is going to be N+1, but there is no reason. So this example is pretty much moot. The trouble is in defining sources, a sink is much easier to hand-roll.

Let me try to do something that shows an Enumerable as source. Do you have other sources you would like me to try as an example?

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-55644737.

josevalim commented 10 years ago

Right, that is the question! Anything else (besides Enumerable) that would require a custom implementation that involves setting up its own process (like an Enumerable requires)?

hamiltop commented 10 years ago

I think the implied requirement is one or more elements emitted. Anything that meets that requirement should be enumerable (it might require being wrapped like a GenEvent.Stream). I think anything that emits one or more elements and is not enumerable should be Enumerable. Let's solve from that side if there are any such cases. On Sep 15, 2014 12:33 PM, "José Valim" notifications@github.com wrote:

Right, that is the question! Anything else (besides Enumerable) that would require a custom implementation that involves setting up its own process (like an Enumerable requires)?

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-55645576.

josevalim commented 10 years ago

Ok, that said, here is the implementation of an Enumerable source:

defmodule GenEvent.StreamHandler do
  use GenEvent

  def init({enumerable, ref}) do
    cont = &Enumerable.reduce(enumerable, &1, fn x, _ -> {:suspend, x} end)
    send_next({cont, ref})
  end

  def handle_event(_event, state) do
    send_next(state)
  end

  defp send_next({cont, ref}) do
    case cont.({:cont, :ok}) do
      {:suspended, x, cont} ->
        send self, {self, {self, ref}, {:ack_notify, x}}
        {:ok, {cont, ref}}
      {:halted, _} ->
        :remove_handler
      {:done, _} ->
        :remove_handler
    end
  end
end

defimpl EventSource, for: Any do
  def add(enumerable, pid)
    {:ok, manager} = GenEvent.start_link()
    GenEvent.add_process_handler(manager, pid, sticky: true)

    ref = make_ref()
    GenEvent.add_handler(manager, {GenEvent.StreamHandler, ref},
                         {enumerable, ref}, sticky: true)
  end
end

Is it straight-forward enough?

josevalim commented 10 years ago

It is not going to work on master for three reasons:

  1. it requires sticky handlers
  2. it requires being able to send events to itself
  3. it supports remove_handler on init

But I have a spike of those things in a branch and I have confirmed the code works. :)

hamiltop commented 10 years ago

First impression. Without fully grokking the code, I would feel comfortable building an implementation of EventSource through copying and tweaking it. That's a good sign. Most protocols result in a similar approach (I can never remember what Inspect looks like, so I always copy and tweak).

Second impression. Once you understand what's going on, it feels "clever". "clever" make me nervous. The clever piece here is the looping of the GenEvent.StreamHandler. I would not have thought to use a handler to consume events and send them back to itself. That might be my lack of exposure to erlang patterns.

On the positive side: I understand better why GenEvent is a useful backbone. This code is very simple (once you get the clever piece).

The negative side: I think emulating this for other types will be a little confusing. Perhaps we should encourage just implementing Enumerable and only implementing EventSource directly if performance becomes an issue.

Overall I like it a lot. Just trying to capture my first impressions because that's important to a clear user experience.

On Mon, Sep 15, 2014 at 1:35 PM, José Valim notifications@github.com wrote:

Ok, that said, here is the implementation of an Enumerable source:

defmodule GenEvent.StreamHandler do use GenEvent

def init({enumerable, ref}) do cont = &Enumerable.reduce(enumerable, &1, fn x, _ -> {:suspend, x} end) send_next({cont, ref}) end

def handle_event(_event, state) do send_next(state) end

defp send_next({cont, ref}) do case cont.({:cont, :ok}) do {:suspended, x, cont} -> send self, {self, {self, ref}, {:acknotify, x}} {:ok, {cont, ref}} {:halted, } -> :removehandler {:done, } -> :remove_handler end endend defimpl EventSource, for: Any do def add(enumerable, pid) {:ok, manager} = GenEvent.start_link() GenEvent.add_process_handler(manager, pid, sticky: true)

ref = make_ref()
GenEvent.add_handler(manager, {GenEvent.StreamHandler, ref},
                     {stream, ref}, sticky: true)

endend

Is it straight-forward enough?

— Reply to this email directly or view it on GitHub https://github.com/hamiltop/streamz/issues/20#issuecomment-55654385.