absinthe-graphql / absinthe_relay

Absinthe support for the Relay framework
MIT License
182 stars 88 forks source link

Connection.from_dataloader/5 helper? #128

Open milmazz opened 5 years ago

milmazz commented 5 years ago

I recently started using this module, and I was a bit surprised when I didn't find a helper using Dataloader, something analogous to Absinthe.Relay.Connection.from_query/4.

I did some basic adaptations and seems to be working, but probably I'm missing something here.

  def from_dataloader(loader, source_name, resource, parent, options) do
    # NOTE: The user should provide the `max` field, but this is just for demonstration purposes.
    opts = [max: options[:max] || 100]
    # NOTE: The user must provide the `sort` field, but this is just for demonstration purposes.
    sort = options[:sort] || [%{asc: :inserted_at}]
    pagination_args = Keyword.fetch!(options, :pagination_args)

    with {:ok, offset, limit} <- Connection.offset_and_limit_for_query(pagination_args, opts) do
      args = %{sort: sort, limit: limit, offset: offset}

      loader
      |> Dataloader.load(source_name, {resource, args}, parent)
      |> on_load(fn loader ->
        records = Dataloader.get(loader, source_name, {resource, args}, parent)

        opts =
          opts
          |> Keyword.put(:has_previous_page, offset > 0)
          |> Keyword.put(:has_next_page, length(records) > limit)

        Connection.from_slice(Enum.take(records, limit), offset, opts)
      end)
    end
  end

Please let me know if you're interested on something like this, I can start working on a proper PR if you find this helper could be useful.

bgentry commented 5 years ago

I'm also really interested in using dataloader for my Relay connections. This was discussed awhile ago in #100 and at the time @benwilson512 said:

There is not any way to use dataloader with relay from connection at this time, for the same reason that it wasn't possible with absinthe_ecto. The only way to generate sensible SQL is via a window fu function or lateral joins, neither of which are well supported by Ecto yet.

However in the interim, Ecto gained significant new functionality including Window functions: https://github.com/elixir-ecto/ecto/pull/2618

It also appears that Ecto now has at least some support for lateral joins.

It seems pretty clear there's a lot of interest in using dataloader with this library. My particular interest is in being able to easily add connections to my types, not just at the top level query in my schema.

I can open a separate issue if you'd like :smile: I think there should be somewhere to at least track the general issue of compatibility between this library and dataloader.

benwilson512 commented 5 years ago

Ecto’s lateral join support is the same as it was then, which requires manually writing query strings. This makes it very hard to generate correctly.

The window function addition though will probably make a difference. A PR to add this functionality would be welcome, but please don’t create an issue.

On Wed, Dec 19, 2018 at 10:24 PM Blake Gentry notifications@github.com wrote:

I'm also really interested in using dataloader for my Relay connections. This was discussed awhile ago in #100 https://github.com/absinthe-graphql/absinthe_relay/issues/100 and at the time @benwilson512 https://github.com/benwilson512 said https://github.com/absinthe-graphql/absinthe_relay/issues/100#issuecomment-357557000 :

There is not any way to use dataloader with relay from connection at this time, for the same reason that it wasn't possible with absinthe_ecto. The only way to generate sensible SQL is via a window fu function or lateral joins, neither of which are well supported by Ecto yet.

However in the interim, Ecto gained significant new functionality including Window functions: elixir-ecto/ecto#2618 https://github.com/elixir-ecto/ecto/pull/2618

It also appears that Ecto now has at least some support for lateral joins.

It seems pretty clear there's a lot of interest in using dataloader with this library. My particular interest is in being able to easily add connections to my types, not just at the top level query in my schema.

I can open a separate issue if you'd like 😄 I think there should be somewhere to at least track the general issue of compatibility between this library and dataloader.

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/absinthe-graphql/absinthe_relay/issues/128#issuecomment-448846669, or mute the thread https://github.com/notifications/unsubscribe-auth/AAPICQT0XILaz52SUz62xSay1f58pszFks5u6wL4gaJpZM4ZXjmh .

bgentry commented 5 years ago

@milmazz as I understand it, your solution breaks when loading for many child associations at the same time because the limit and offset don’t get applied on a per-parent basis; they’re instead applied to all combined results.

Imagine you’re trying to load Articles along with limit: 10 Comments on each article. You don’t want 10 total Comments, you want up to 10 for each Article. This is the kind of thing that would require something like a window function to pull off.

I’m super interested in seeing this work. Hoping I can take a crack at it soon, but don’t let that stop you from getting to it first @milmazz 🙂

bgentry commented 5 years ago

I was able to get a working implementation of Relay Connections with Dataloader that uses window functions to paginate individually for each parent. While it does work and already allows for some fairly simple reuse of a connection on multiple GraphQL objects, it's pretty ugly and tangled. I'm not sure if here or the Dataloader repo is the best place to post about this, so lmk if I should relocate this.

I encountered a few challenges along the way:

Here's some example code extracted from the branch where I got this working. The schema examples show a Product which has many Packages. The Packages connection is available both at the top level, and on the Product object.

defmodule MyApp.GraphQL.Schema do
  connection node_type: :package do
    # As far as I can tell, there's no way to make this work for both root-level and child scenarios:
    field(:total_count, non_null(:integer), resolve: &Helpers.resolve_total_count/2)

    edge do
    end
  end

  object :product do
    field(:id, non_null(:id))

    connection field(:packages, node_type: :package) do
      # a macro I had to define in order to make the same args availble
      # everywhere I have a `PackageConnection`:
      package_connection_args()

      resolve(
        dataloader_relay_paginate(
          MyApp.WebSource,
          MyApp.Package,
          :product_id,
          &Helpers.sort_from_args(&1, desc: :updated_at),
          max: 100
        )
      )
    end
  end

  connection field(:packages, node_type: :package) do
    package_connection_args()

    resolve(
      dataloader_relay_paginate(
        MyApp.WebSource,
        MyApp.Package,
        nil,
        &Helpers.sort_from_args(&1, desc: :updated_at),
        max: 100
      )
    )
  end
end

defmodule MyApp.WebSource do
  def query(Package, %{current_user: user} = params) do
    Package
    |> where(company_id: user.company_id)
    |> MyApp.GraphQL.Helpers.relay_connection_paginate(params)
  end
end

defmodule MyApp.GraphQL.Helpers do
  import Ecto.Query, only: :macros
  import Absinthe.Resolution.Helpers

  alias MyApp.Repo

  @doc """
  Takes a function which returns a query. That query is executed with `Connection.from_query`.
  The result is also augmented with a field `total_count_query`, which can lazily be used to
  resolve the total count of records in the unpaginated result set.
  """
  def all_with_total_count(args, resolution, query_fn) when is_function(query_fn) do
    query = query_fn.(args, resolution)

    total_count_query = from(item in query, select: count()) |> Ecto.Query.exclude(:order_by)

    # hardcoded limit of 100 records at a time, for now:
    case Absinthe.Relay.Connection.from_query(query, &Repo.all/1, args, max: 100) do
      {:ok, data} -> {:ok, Map.put(data, :total_count_query, total_count_query)}
      {:error, _} = err -> err
    end
  end

  def args_with_pagination(args, partition_field, order_by, opts \\ []) do
    {:ok, offset, limit} = Absinthe.Relay.Connection.offset_and_limit_for_query(args, opts)

    conn_pagination = %{
      offset: offset,
      limit: limit,
      partition_field: partition_field,
      order_by: order_by
    }

    Map.merge(args, %{_connection_paginate: conn_pagination})
  end

  def dataloader_relay_paginate(
        source_module,
        queryable,
        partition_field,
        order_by_fn,
        opts \\ []
      ) do
    fn parent, args, %{context: %{loader: loader}} = res ->
      order_by = order_by_fn.(args)
      args = args_with_pagination(args, partition_field, order_by, opts)

      resource = opts[:resource] || res.definition.schema_node.identifier

      if partition_field do
        loader
        |> Dataloader.load(source_module, {resource, args}, parent)
        |> on_load(fn loader ->
          records = Dataloader.get(loader, source_module, {resource, args}, parent)
          %{_connection_paginate: %{offset: offset, limit: limit}} = args

          opts =
            opts
            |> Keyword.put(:has_previous_page, offset > 0)
            |> Keyword.put(:has_next_page, length(records) > limit)

          Absinthe.Relay.Connection.from_slice(Enum.take(records, limit), offset, opts)
        end)
      else
        query_fn = fn params, _ ->
          # TODO: this is kind of cheating -- can we let Dataloader.Ecto do this work instead of duplicating parts of it?
          source = loader.sources[source_module]

          queryable
          |> source_module.query(Enum.into(params, source.default_params))
        end

        all_with_total_count(args, res, query_fn)
      end
    end
  end

  def relay_connection_paginate(query, %{_connection_paginate: pagination_args}) do
    %{limit: limit, offset: offset, partition_field: partition_field, order_by: order_by} =
      pagination_args

    query = Ecto.Queryable.to_query(query)
    |> Ecto.Query.order_by(^order_by)

    if partition_field do
      windowed_query =
        from(q in query,
          # TODO: dynamic primary key
          select: %{id: q.id, dense_rank: over(dense_rank(), :paginate)},
          windows: [paginate: [partition_by: ^partition_field, order_by: ^order_by]]
        )

      start_offset = offset
      end_offset = offset + limit

      query =
        from(q in query,
          join: windowed in subquery(windowed_query),
          on: q.id == windowed.id,
          where: windowed.dense_rank > ^start_offset,
          # fetch an additional record so we can see if there's a next page:
          where: windowed.dense_rank <= ^(end_offset + 1)
        )

      query
    else
      query
      |> Ecto.Query.limit(^limit)
      |> Ecto.Query.offset(^offset)
    end
  end

  def relay_connection_paginate(query, _args), do: query

  def resolve_total_count(_, %{source: conn}), do: {:ok, Repo.one!(conn.total_count_query)}

  @doc """
  Extracts the sorting param (for order_by) from the pagination arguments. If
  no sort is provided, falls back to the provided default_sort.

  For the 1st argument, you can provide either just the :order_by portion of
  the connection arguments, or the entire set of connection arguments (from
  which :order_by will be pulled).

  This function intentionally does not handle partial sort instructions (i.e.
  only a field w/ no direction). Use default_value on the GraphQL order input
  objects' fields to make this work.
  """
  def sort_from_args(%{direction: dir, field: field} = _sort_arg, _default_sort)
      when dir in [:asc, :desc] do
    Keyword.put([], dir, field)
  end

  def sort_from_args(%{sort_by: sort_arg} = _all_args, default_sort) do
    sort_from_args(sort_arg, default_sort)
  end

  def sort_from_args(_args, default_sort), do: default_sort
end
gullitmiranda commented 5 years ago

@bgentry i make some changes in your example to work for me, maybe can be of some help to you:

  # replace to all_with_total_count
  def put_total_count_query(
        {:ok, data},
        queryable,
        source_module,
        query_params,
        assoc_field,
        %{source: %schema{} = parent} = _res
      ) do
    query =
      queryable
      |> source_module.query(query_params |> Map.put(:_total_count, true))
      |> Ecto.Query.exclude(:order_by)
      |> Ecto.Query.exclude(:limit)

    assoc = Ecto.Association.association_from_schema!(schema, assoc_field)
    %{owner_key: owner_key} = assoc

    id = Map.fetch!(parent, owner_key)

    total_count_query =
      Ecto.Association.assoc_query(assoc, [], query, [id])
      |> select(count())

    {:ok, Map.put(data, :total_count_query, total_count_query)}
  end

  def put_total_count_query(result, _, _, _, _), do: result

  def dataloader_relay_paginate(
        source_module,
        queryable,
        partition_field,
        order_by_fn,
        opts \\ []
      ) do
    fn parent, args, %{context: %{loader: loader}} = res ->
      order_by = order_by_fn.(args)
      args = args_with_pagination(args, partition_field, order_by, opts)

      resource = opts[:resource] || res.definition.schema_node.identifier

      source = loader.sources[source_module]

      query_params = Enum.into(args, source.default_params)

      if partition_field do
        loader
        |> Dataloader.load(source_module, {resource, args}, parent)
        |> on_load(fn loader ->
          records = Dataloader.get(loader, source_module, {resource, args}, parent)
          %{_connection_paginate: %{offset: offset, limit: limit}} = args

          opts =
            opts
            |> Keyword.put(:has_previous_page, offset > 0)
            |> Keyword.put(:has_next_page, length(records) > limit)

          Absinthe.Relay.Connection.from_slice(Enum.take(records, limit), offset, opts)
          |> put_total_count_query(queryable, source_module, query_params, resource, res)
        end)
      else
        query = queryable |> source_module.query(query_params)

        Absinthe.Relay.Connection.from_query(query, &Repo.all/1, args, opts)
        |> put_total_count_query(queryable, source_module, query_params, resource, res)
      end
    end
  end

  # this method prevent of using window queries when build `total_count_query`
  def relay_connection_paginate(query, %{_total_count: true}) do
    query
  end
cospin commented 2 months ago

Hey there! Anyone know if there has been progress on Ecto? @bgentry is your solution still working? Have you improved/changed anything in these years?

I'm just about to start a new project and I've come across this :(