phoenixframework / phoenix

Peace of mind from prototype to production
https://www.phoenixframework.org
MIT License
21.47k stars 2.88k forks source link

RFC: Incoming/outgoing channel events #248

Closed chrismccord closed 9 years ago

chrismccord commented 10 years ago

Someone building out a simple Todo™ application with Channels recently had a reasonable requirement that Phoenix doesn't currently address. They had a standard REST endpoint for creating todos, and they wanted to broadcast the "todo:created" event to all subscribers. The issue they have is they did not want the "creator" of the todo to receive the broadcast. Since they were in the context of a Controller, they have no ability to restrict the broadcast. If they were using only channels, this will be trivial by using broadcast_from/3 to broadcast to everyone but the current socket, i.e.:

defmodule TodoChannel do

  def join(socket, project_id, message) do
    {:ok, socket}
  end

  def event(socket, "todo:created", todo_attributes) do
    todo = # create todo in some DB store

    # broadcast "todo:created" to everyone but myself
    broadcast_from socket, "todo:created", filtered(todo)
  end
end

They aren't doing the creating in the Channel though, instead doing something like:

defmodule TodoControler do

  def create(conn, todo_params) do
    todo = # create todo in some DB store

    Channel.broadcast "todos", todo.project_id, "todo:created", %{
      id: todo.id,
      owner_id: todo.owner_id,
      project_id: todo.project_id
    }

    conn
  end
end

The issue is, the "creator" of the todo who sent the REST post, is receiving the broadcast, where they want to skip that user. The only way to do this currently would be to have the client perform some logic when receiving the message and discard it for the creator. This might be OK, but we could accommodate this use-case and offer more powerful event handling by renaming event/3 to incoming/3 and introducing an outgoing/3 function. Currently, Channels only process incoming events with the event/3 definitions. Outgoing events are sent directly to the clients, with no processing in between. We could instead move to an API like this, where a default outgoing/3 just sends the message for existing behavior:


defmodule TodoChannel do

  def join(socket, project_id, message) do
    socket = assign(socket, :user_id, authenticated_user_id(message))
    {:ok, socket}
  end

  def incoming(socket, "todo:created", todo) do
    # broadcast "todo:created" to everyone but myself
    broadcast_from socket, "todo:created", filtered_keys(todo)
  end

  def outgoing(socket, "todo:created", todo) do
    # process broadcast on this chan/topic, and skip myself if I'm the owner
    unless todo.owner_id == get_assign(socket, :user_id) do
      reply socket, "todo:created", filtered_keys(todo)
    end
  end

end

This proposal imposes more overhead sense we're doing a function dispatch per listener, and listeners need to be careful of not blocking in their outgoing/3 definitions or they risk clogging the broadcast tubes. Thoughts? Objections? Alternatives?

josevalim commented 10 years ago

My first reaction to this is that the event can be easily filtered in the client.

scrogson commented 10 years ago

I agree with handling on the client. For instance, in XMPP all messages are sent from the server to all users in a multi-user chat room (including the user who sent the message). This helps ensure that the message was received by the server.

Let's keep this conversation open for a bit. I have some additional thoughts I'd  like to share but don't have time to write it all out right now ;)

On Sat, Aug 9, 2014 at 9:24 AM, José Valim notifications@github.com wrote:

My first reaction to this is that the event can be easily filtered in the client.

Reply to this email directly or view it on GitHub: https://github.com/phoenixframework/phoenix/issues/248#issuecomment-51687914

mdesantis commented 10 years ago

Forgive me if I wrote a silly thing, I was just looking around while I noticed this issue :) what about messages that some clients are not supposed to receive, for example for sensitive data which should be visible only to some users?

chrismccord commented 10 years ago

Forgive me if I wrote a silly thing, I was just looking around while I noticed this issue :) what about messages that some clients are not supposed to receive, for example for sensitive data which should be visible only to some users?

For these cases, you would be best served by a separate topic for authorized users or an entirely different channel, i.e.: Admin.TodoController

thegrubbsian commented 10 years ago

I'm sort of torn on this one. I'm the "someone" in question above...so thanks Chris for opening this thread. On the one hand I understand that filtering at the client is a solution and that's what I may opt for in this case. However, it seems like the framework already supports the scenario of not looping back to the sender, that's what broadcast_from/3 is for in the first place. The piece that is missing though is the ability to "send" something to the channel from the server outside the scope of the channel itself (e.g. from a controller).

My attempt to solve the issue is to use a topic and pass a message over to the channel and then let it call broadcast_from\3. It's not the best solution and seems to have some issues (may be underlying bugs) but it feels clunky.

defmodule Todo.ItemsChannel do
  use Phoenix.Channel
  alias Phoenix.Topic

  def join(socket, "public", _message) do
    handler = spawn_link(fn -> publisher(socket) end)
    Topic.subscribe(handler, "todo-topic")
    { :ok, socket }
  end

  def publisher(socket) do
    receive do
      { event, data } ->
        broadcast_from(socket, event, data)
      publisher(socket)
    end
  end

end

And then in the controller:

defmodule Todo.ItemsController do
  use Phoenix.Controller
  alias Phoenix.Topic
  import Ecto.Query
  use Jazz

  def create(conn, params) do
    item = Repo.insert(%TodoItem{ title: params["title"], completed: params["completed"] })
    Topic.broadcast("todo-topic", { "todo:created", item })
    json conn, JSON.encode!(item)
  end

end

My only other argument for implementing a solution for this in the framework is that it does close the door on a potential class of bugs on the client. I like solutions that leave me less rope to hang myself ;)

chrismccord commented 10 years ago

Thanks for the feedback @thegrubbsian. So we can loop others in, the approach in @thegrubbsian's example can't work since all sockets end up calling broadcast_from for the single Topic event. Let's step back and discuss what the proposed incoming/outgoing functions give us, outside of the "skip creator" use-case. With a generalized incoming/outgoing system, I can imagine other areas where it would be really helpful. For example, consider the use-case where we want to publish "todo:created", but we want to add some client specific data to the message, such as an is_editable: field which will be client specific. Today this is impossible. With outgoing/3, we could support it via:

defmodule TodoChannel do

  def join(socket, project_id, message) do
    socket = assign(socket, :user_id, authenticated_user_id(message))
    {:ok, socket}
  end

  def incoming(socket, "todo:created", todo) do
    broadcast_from socket, "todo:created", filtered_keys(todo)
  end

  def outgoing(socket, "todo:created", todo) do
    reply socket, "todo:created", Dict.merge(todo, %{
      is_editable: Todo.editable_by?(socket, get_assign(socket, :user_id))
    })
  end

end

This seems like a big win to me. Consider the alternative, we'd need to embed additional "global" fields on the todo message, like editable_by_ids containing all user ids with certain privileges and require the client to add the logic for editable_by?. Now we're leaking more data to each client than necessary and pushing a lot of business logic down the pipe that should really stay on the server.

I also misspoke earlier, each outgoing/3 happens in its own socket process, so we wouldn't block other broadcasts. The issue though is we would need to document that outgoing events should do as little work as possible. If we need to hit the DB for example, we could produce a huge amount of overhead. 1000 listeners would all hit the DB in outgoing at once. With careful application, and smart use of socket assigns, I think incoming/outgoing is a compelling addition that still works as usual for cases where outgoing/3 isn't defined. Does that sway any opinions?

josevalim commented 10 years ago

@chrismccord I still don't see what incoming/3 is giving us. Couldn't I just define event/3 that takes all messages, due the pre-processing I want, and then call an internal handle_event/3? Btw, I don't think we should do this:

https://github.com/phoenixframework/phoenix/blob/master/lib/phoenix/channel.ex#L23-L27

All the callback related modules in Elixir (GenEvent, GenServer, etc) requires you to explicitly write the last clause (the I don't care one). Plus that line breaks the use case I just wrote above (because you will likely get a warning).

So I still don't seen the benefit of incoming/3 because it seems I can do exactly the same thing using event/3.

Regarding outgoing/3, you bring good points. However, the DB question you raised is an important one. What if Todo.editable_by? makes a query? People will definitely write the code you wrote above... so how can we avoid people running on the pitfall of doing N queries (one per user) but just one and then filter the outgoing message?

Also how will you avoid infinite loops if you are sending replies from the outgoing callback?

chrismccord commented 10 years ago

I still don't see what incoming/3 is giving us. Couldn't I just define event/3 that takes all messages, due the pre-processing I want, and then call an internal handle_event/3? Btw, I don't think we should do this: So I still don't seen the benefit of incoming/3 because it seems I can do exactly the same thing using event/3.

@josevalim incoming/3 would replace event/3. (exact functionality, we would just rename the function to make it clear).

Also how will you avoid infinite loops if you are sending replies from the outgoing callback?

Replies would send the message directly to the client (it wouldn't invoke outgoing/3). Only broadcasts would be processed by outgoing.

However, the DB question you raised is an important one. What if Todo.editable_by? makes a query? People will definitely write the code you wrote above... so how can we avoid people running on the pitfall of doing N queries (one per user) but just one and then filter the outgoing message?

This is the biggest problem, and folks will definitely write code that hits the DB. For many cases, this is perfectly fine. Consider all kinds of applications where the subscribers to a topic number in the dozens at most (team chat, base camp, trello...). For a lot of applications, hitting the DB in outgoing would be OK, but for others where topics have thousands of subscribers, programmers will have to be aware of the tradeoffs and overhead. The question to answer: is this a compelling enough addition for the tradeoffs of needing clear docs and proper use? Implementation wise, this is very easy for use to add.

josevalim commented 10 years ago

@chrismccord is it easy to traverse all sockets? Instead of outgoing, I would rather do this:

Enum.each all_but_me(), fn(socket) ->
  reply socket, "todo:created", Dict.merge(todo, %{
    is_editable: Todo.editable_by?(socket, get_assign(socket, :user_id))
  })
end

This way if I need to wrap everything into a single query, batch stuff and so on, it is clear how to do it. We just need good function names.

chrismccord commented 10 years ago

is it easy to traverse all sockets? Instead of outgoing, I would rather do this:

We can get all subscriber pids easily. We have Topic.subscribers, we would just add Channel.subsribers. We only get pids though, not the %Socket{}'s. We could probably send the pids a message to return themselves as a %Socket{} though to accomplish your example.

This way if I need to wrap everything into a single query, batch stuff and so on, it is clear how to do it.

I like this idea. Keep in mind, our outgoing/3 message could also contain batched query data, but your approach makes handling the batch local to the broadcast, which could be better.

chrismccord commented 10 years ago

@josevalim Thinking out loud: From a Controller broadcast, with your example we'd lose the outgoing processing, but we could define functions on our channel to accomplish the same thing like your example:

# controller
def show(conn, _) do
  TopicChannel.broadcast_create(todo)
end

# channel
def broadcast_create(todo) do
  Enum.each subsribers, fn(socket) ->
    reply socket, "todo:created", Dict.merge(todo, %{
      is_editable: Todo.editable_by?(socket, get_assign(socket, :user_id))
    })
  end
end

This pushes the event handling down to the channel, instead of using the plain Phoenix.Channel.broadcast throughout the app. This could be a good thing since arbitrary event names , like "todo:created" stay isolated to the channel.

josevalim commented 10 years ago

We can get all subscriber pids easily. We have Topic.subscribers, we would just add Channel.subsribers. We only get pids though, not the %Socket{}'s. We could probably send the pids a message to return themselves as a %Socket{} though to accomplish your example.

So if you want to send messages to PIDs, it starts to look like GenEvent API which has handle_event/2, to handle events, and a handle_call/2 to handle particular calls. We should consider moving to this direction because we can support GenServer-like multi calls.

A multi call will basically send a message to all subscsribers and collect their result. This way you can send a message to all pids without doing explicit "get all pids, send the message, collect the results". Multicall will also setup the proper reference, monitor, etc.

The difference though is that GenEvent API (as GenServer API and friends) expect tuples as results. You could think of moving channels to a similar API. There is definitely food for thought here. The point is that you want to keep the APIs consistent when we start to support custom messages to channels.

chrismccord commented 10 years ago

So if you want to send messages to PIDs, it starts to look like GenEvent API which has handle_event/2, to handle events, and a handle_call/2 to handle particular calls. We should consider moving to this direction because we can support GenServer-like multi calls.

I need to study up on GenEvent. How do you see GenEvent being slotted into the current API? Right now, we have the Socket.Handler that wraps the transport layer (cowboy_websocket_handler only atm). It's worth noting the outgoing/3 solution wouldn't require any multicall or "get al pids" step. It would just be a function dispatch in the Handler when broadcasts are received.

chrismccord commented 10 years ago

The point is that you want to keep the APIs consistent when we start to support custom messages to channels.

@josevalim This might be what you meant, but we also need to consider normal pid subscribers. If a normal process wants to subscribe to Channels, they should be able to to Channel.subscribe(self, "todos", "123"). The issue with the Enum.each subsribers, fn(socket) -> approach is now we could have regular processes, or sockets. The outgoing function sidesteps this issue since all processes just receive the %Message{} message, and the Socket.Handler just happens to invoke outgoing/3. Regular pid subscribers would just receive a message containing the %Message{} struct that they can match on. I agree that what we decide here needs to take consideration for where we want to go in the future with multiple transports and potentially different message encodings.

josevalim commented 10 years ago

@chrismccord we don't need to make the gen events, it would be just an idea of the API.

We have regular processes or sockets but sockets are always processes behind the scenes, so we always have processes. The same way processes need to pattern match on %Message{}, we would also need them to match on something like {ref, _} for those custom messages.

pap commented 10 years ago

On a project for which i contributed the web socket broadcaster (in elixir of course!) the json that was "pulled out" of a mongo db included the ids (uuid) of the subscribers to that message. I had an ets table with initiated sessions(web sockets) and respective pids so i could easily update/send to a given subset of connected users. It's probably off topic or at least not exactly what you need but i thought i could share the idea ...

chrismccord commented 10 years ago

@pap Thanks for the feedback. Our Topic layer supports something along those lines and it's what the Channels are built on. The topic would be the uuid, and we could get a list of all Pids with Topic.subscribers(the_uuid). The issue is we get pids, not %Socket{}'s.

One idea to sidestep the previous lookup issues would be to send an anon function with the message for the Socket's to invoke in their own context. This accomplishes what outgoing/3 does, provides the ability to pass eager lookups since we provide a closure, and removes the need to fetch all subscriber sockets.

Channel.broadcast "todos", todo.project_id, "todo:created", fn socket ->
  reply socket, "todo:created", Dict.merge(%{
    id: todo.id,
    owner_id: todo.owner_id,
    project_id: todo.project_id
  }, %{
    is_editable: Todo.editable_by?(socket, get_assign(socket, :user_id))
  })
end
josevalim commented 10 years ago

The issue with anonymous functions is that if you need to send it between nodes (which is one of your plans), the nodes need to have the exact same code version. And that breaks more often than you think (like when doing upgrading servers). So we would like to push people to rely less on anonymous functions if they are meant to be sent between machines.

chrismccord commented 10 years ago

@josevalim you're right. That can't work

pap commented 10 years ago

@chrismccord Hum... got the idea. The difference in my code is that the channels are not the uuid ... the browser/client sends an uuid (the user/browser/session or whatever) and the cowboy web socket controlling process gets it's pid registered as value for that uuid key. So perhaps there could be a second ets or ets-like "thing" to keep track of who is subscribed to what. i.e. User zzzz-1234-1234 has pid <123,123,123> and in a second table we have user K:zzzz-1234-1234 V: channel foo , K:zzzz-1234-1234 V: channel bar. I think the ets approach can be tricky when distributing but please correct me if i'm wrong.

pap commented 10 years ago

@chrismccord I also created a wrapper module to make the ETS operations "easier" here's the gist https://gist.github.com/311b178691c808b9abb3.git

jeregrine commented 10 years ago

@josevalim I was spitballing with @chrismccord and two ideas came up.

1) What if for the outgoing/incoming we made it be an 'extension' similar to faye? http://faye.jcoglan.com/ruby/extensions.html then a user could define outgoing/incoming ONLY if they need it. And the documentation specific to the extension could warn the users about issues with queries and N+1 style performance issues. This also leads to a "plug for sockets" style where the ordering is explicit.

2) What if topic just became a GenEvent? Then subscribers could stream the events and potentially we could put the outgoing/incoming just as a Stream.map before/after the user gets the information. (I don't know how this looks or what the performance characteristics are just spitballing)

josevalim commented 10 years ago

I am not against outgoing as long as it has the proper warnings.

The issue though is that outgoing does not ultimately solve issues that can come with broadcast where you want to have access to all processes at the same time. We are not sure this is an issue yet but I do believe it will happen and users won't have a way to avoid N queries.

I cannot talk about topic right now because I am not really familiar with it.

chrismccord commented 9 years ago

incoming/outgoing callbacks is landing in 0.8. Thanks for the input everyone!