absinthe-graphql / absinthe_phoenix

MIT License
309 stars 82 forks source link

Can't track socket disconnect in __absinthe__:control channel #39

Open makefunstuff opened 6 years ago

makefunstuff commented 6 years ago

Hello there! I have the following problem: I have customer facing site and backoffice and I would like to show online status in backoffice using graphql subscriptions. Let’s assume that I don’t have direct access to Phoenix channels and for me it’s not possible to use channels at all, I have only absinthe subscriptions. How can I detect that socket connection is closed for a specific user in absinthe subscription level. OFC that is easy to do in plain phoenix channels but when it comes to absinthe there I couldn’t find any proper way to do this. Maybe I’m doing something wrong or it was overlooked in absinthe:control channel implementation?

benwilson512 commented 6 years ago

Hey @makefunstuff there is not a way to do presence with Absinthe.Phoenix at this time, your best bet will probably be to leverage an additional phoenix channel and use normal tracking. This is definitely a feature we want to support, but we aren't even sure what the right design would be here yet given that Phoenix Presence has a very specific format in order to make its CRDT work. I'm not yet sure how we'd let someone wrap that info in GraphQL subscriptions or similar.

makefunstuff commented 6 years ago

Thanks for the response, I think in my case the way to go is to build a decorator module on top of existing absinthe socket and add missing connection tracking functionality. Or I can try to prepare pr and add this to existing absinthe:control channel. What do you think?

titzi commented 5 years ago

@makefunstuff How did you end up implementing it? I have a similar thing to solve and kind of lost right now.

makefunstuff commented 5 years ago

You can define your own both socket and channel decorator modules, where you can track presence manually. At least that's how I did this.

elixir4ever commented 5 years ago

@makefunstuff Can you share how you implemented both socket and channel decorator modules? Thanks

makefunstuff commented 5 years ago

@elixir4ever, sure:

first I have defined the following socket decorator

defmodule MyAppWeb.AbsintheSocketDecorator do
  use Phoenix.Socket

  channel("__absinthe__:*", MyAppWeb.AbsintheChannelDecorator,
    assigns: %{
      __absinthe_schema__: MyAppWeb.Graphql.MySchemaModule
      __absinthe_pipeline__: nil
    }
  )
  def connect(params, socket) do
     # your connect logic
  end

  def id(_socket) do
    # your id logic
  end

  defdelegate put_options(socket, opts), to: Absinthe.Phoenix.Socket

  defdelegate put_schema(socket, schema), to: Absinthe.Phoenix.Socket
end

Then you should define channel decorator

defmodule MyAppWeb.AbsintheChannelDecorator do
  def join(topic, msg, socket) do
     # your join logic where you can tack join event
  end

   def terminate(reason, socket) do
      # your disconnect tracking logic
      {:ok, socket}
   end

  defdelegate handle_in(event, msg, arg2), to: Absinthe.Phoenix.Channel

  defdelegate default_pipeline(schema, options), to: Absinthe.Phoenix.Channel
end

add socket decorator in your endpoint ex:

  socket("/app", MyAppWeb.AbsintheSocketDecorator)
elixir4ever commented 5 years ago

@makefunstuff Thank you. This is really helpful.

jonsgreen commented 5 years ago

@benwilson512 I am working with a problem that might employ a similar solution. We are developing an api and want to use subscriptions as an alternative to webhooks. The client will execute a subscription to get updates when the results of an asynchronous process is complete and we will then publish the results to the topic channel when ready.

However, I am trying to figure out the best way to deal with the situation where a subscription gets disconnected and then rejoins after the results have been published. One solution would be to have a hook where we can publish the result when someone joins. I am thinking that using Phoenix Tracking when a the client joins would allow us to trigger a publish if the results are already in.

Is the above decorator strategy still the best approach for connecting Absinthe with a Phoenix Channel? Do you have any other suggestions for solving our problem?

IkechukwuAKalu commented 4 years ago

Thanks, @makefunstuff for the workaround.

In case anyone has an issue with the AbsintheChannelDecorator, don't forget to include use Phoenix.Channel

bernardo-martinez commented 4 years ago

Yep thanks, @makefunstuff for the workaround.

However when I try to implement it I get on the reason param: {{:badkey, :schema, ..., looks like put_schema is not been called...

I'm using absinthe 1.5 and absinthe_phoenix 1.5

bernardo-martinez commented 4 years ago

Yep thanks, @makefunstuff for the workaround.

However when I try to implement it I get on the reason param: {{:badkey, :schema, ..., looks like put_schema is not been called...

I'm using absinthe 1.5 and absinthe_phoenix 1.5

Ok, I've fixed it, as you said since this is a decorator, I was missing calling Absinthe.Phoenix.Channel.join(topic, msg, socket) at the end of MyAppWeb.AbsintheChannelDecorator.join like:

def join(topic, msg, socket) do
    # my logic here...
    Absinthe.Phoenix.Channel.join(topic, msg, socket)
  end

Thanks!

autodidaddict commented 3 years ago

What is the current preferred way of dealing with this? I'm in this exact situation where I have background processes that start when a subscription connects that I need to terminate when the subscription is no longer active. Should I still use the manually defined socket and channel decorators, or has there been more work done that isn't visible in this issue?

tunchamroeun commented 2 years ago

I have do some customization to this file https://github.com/absinthe-graphql/absinthe_phoenix/blob/master/lib/absinthe/phoenix/channel.ex 😄

Channel Status (enter, leave and terminate)

  1. Subscribe (enter) the the channel
  2. Unsubscribe (leave) the channel
  3. Terminate (terminate) the channel Above notify in subscription config

    
    defmodule Deps.Absinthe.Phoenix.Channel do
    use Phoenix.Channel
    alias Deps.Absinthe.Phoenix.Socket
    require Logger
    
    @moduledoc false
    
    @doc false
    def __using__(_) do
    raise """
    ----------------------------------------------
    You should `use Deps.Absinthe.Phoenix.Socket`
    ----------------------------------------------
    """
    end
    
    @doc false
    def join("__absinthe__:control", _, socket) do
    schema = socket.assigns[:__absinthe_schema__]
    pipeline = socket.assigns[:__absinthe_pipeline__]
    
    absinthe_config = Map.get(socket.assigns, :absinthe, %{})
    
    opts =
      absinthe_config
      |> Map.get(:opts, [])
      |> Keyword.update(
           :context,
           %{pubsub: socket.endpoint},
           fn context ->
             Map.put_new(context, :pubsub, socket.endpoint)
           end
         )
    
    absinthe_config =
      put_in(absinthe_config[:opts], opts)
      |> Map.update(:schema, schema, & &1)
    
    absinthe_config =
      Map.put(absinthe_config, :pipeline, pipeline || {__MODULE__, :default_pipeline})
    
    socket = socket
             |> assign(:absinthe, absinthe_config)
    {:ok, socket}
    end
    
    @doc false
    def handle_in("doc", payload, socket) do
    config = socket.assigns[:absinthe]
    with variables when is_map(variables) <- extract_variables(payload) do
      config_opts = [context: Map.merge(config.opts[:context], %{"status" => "enter", "subscriptionId" => ""})]
      opts = Keyword.put(config_opts, :variables, variables)
      query = Map.get(payload, "query", "")
      Absinthe.Logger.log_run(
        :debug,
        {
          query,
          config.schema,
          [],
          opts
        }
      )
      {reply, socket} = run_doc(socket, query, config, opts)
    
      Logger.debug(
        fn ->
          """
          -- Absinthe Phoenix Reply --
          #{inspect(reply)}
          ----------------------------
          """
        end
      )
      if reply != :noreply do
        {:ok, %{subscriptionId: subscriptionId}} = reply
        socket = subscription_status(socket, "enter", subscriptionId)
        {:reply, reply, socket}
      else
        {:noreply, socket}
      end
    else
      _ -> {:reply, {:error, %{error: "Could not parse variables as map"}}, socket}
    end
    end
    
    def handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket) do
    socket = subscription_status(socket, "leave", doc_id)
    pubsub =
      socket.assigns
      |> Map.get(:absinthe, %{})
      |> Map.get(:opts, [])
      |> Keyword.get(:context, %{})
      |> Map.get(:pubsub, socket.endpoint)
    
    Phoenix.PubSub.unsubscribe(socket.pubsub_server, doc_id)
    Absinthe.Subscription.unsubscribe(pubsub, doc_id)
    {:reply, {:ok, %{subscriptionId: doc_id}}, socket}
    end
    defp run_doc(socket, query, config, opts) do
    case run(query, config[:schema], config[:pipeline], opts) do
      {:ok, %{"subscribed" => topic}, context} ->
        %{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} =
          socket
        :ok =
          Phoenix.PubSub.subscribe(
            pubsub_server,
            topic,
            metadata: {:fastlane, transport_pid, serializer, []},
            link: true
          )
        # add payload when needed in unsubscribe
        socket = Socket.put_options(
          socket,
          context: Map.merge(
            context,
            %{
              "payload" => %{
                "query" => query
              }
            }
          )
        )
        {{:ok, %{subscriptionId: topic}}, socket}
    
      {:ok, %{data: _} = reply, context} ->
        socket = Socket.put_options(socket, context: context)
        {{:ok, reply}, socket}
    
      {:ok, %{errors: _} = reply, context} ->
        socket = Socket.put_options(socket, context: context)
        {{:error, reply}, socket}
    
      {:error, reply} ->
        {reply, socket}
    end
    end
    
    defp run(document, schema, pipeline, options) do
    {module, fun} = pipeline
    
    case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
      {:ok, %{result: result, execution: res}, _phases} ->
        {:ok, result, res.context}
    
      {:error, msg, _phases} ->
        {:error, msg}
    end
    end
    
    defp extract_variables(payload) do
    case Map.get(payload, "variables", %{}) do
      nil -> %{}
      map -> map
    end
    end
    
    @doc false
    def default_pipeline(schema, options) do
    schema
    |> Absinthe.Pipeline.for_document(options)
    end
    def terminate(reason, socket)do
    case reason do
      {:shutdown, :closed} ->
        subscription_id = socket.assigns.absinthe.opts[:context]
                          |> Map.get("subscriptionId")
        socket = subscription_status(socket, "terminate", subscription_id)
        {:noreply, socket}
      _ -> IO.puts "nothing"
    end
    end
    def handle_info(_, state) do
    {:noreply, state}
    end
    defp subscription_status(socket, status, subscription_id) do
    # status (enter, leave and terminate)
    config = socket.assigns[:absinthe]
    Map.get(socket.assigns.absinthe.opts[:context], "payload", "")
    |> case  do
         payload when payload != "" -> with variables when is_map(variables) <- extract_variables(payload) do
                                         query = Map.get(payload, "query", "")
                                         config_opts = [
                                           context: Map.merge(
                                             config.opts[:context],
                                             %{"status" => status, "subscriptionId" => subscription_id}
                                           )
                                         ]
                                         opts = Keyword.put(config_opts, :variables, variables)
                                         context = socket.assigns.absinthe.opts[:context];
                                         run(query, config[:schema], config[:pipeline], opts)
                                         Socket.put_options(
                                           socket,
                                           context: Map.merge(
                                             context,
                                             %{
                                               "subscriptionId" => subscription_id
                                             }
                                           )
                                         )
                                       end
         _ -> socket
       end
    end
    end
## My Subscription Config
Three status will run here
```elixir
field :update_expiration_msg, :string do
      arg(:quiz_template_grouping_id, non_null(:id))
      config(
        fn args, %{
          context: %{
            "status" => status,
            "subscriptionId" => subscription_id
          }
        } ->
          IO.puts("Channel Status")
          IO.inspect status
          IO.puts("Subscription Id")
          IO.inspect subscription_id
          if(status == "enter" and subscription_id != "")do
            add_schedule_job(subscription_id, args)
          end
          if((status == "leave" || status == "terminate") and subscription_id != "")do
            remove_schedule_job(subscription_id)
          end
          {:ok, topic: "#{args.quiz_template_grouping_id}"}
        end
      )
    end