absinthe-graphql / dataloader

DataLoader for Elixir
MIT License
489 stars 99 forks source link

Error when used in transactions #129

Closed arnodirlam closed 2 years ago

arnodirlam commented 3 years ago

Hi there,

we're using Dataloader without Absinthe and came across this error:

** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.3039.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 1562ms.

when running the following code in a test:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  Dataloader.new()
  |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
  |> Dataloader.load(:repo, :roles, user)
  |> Dataloader.run()
end)
|> Ev2.Repo.transaction()

Outside of an Ecto.Multi, it works:

Dataloader.new()
|> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
|> Dataloader.load(:repo, :roles, user)
|> Dataloader.run()

Is this something that can and should be fixed in Dataloader?

Thanks a lot for this great project! 🙌

benwilson512 commented 3 years ago

What version of Dataloader / Ecto are you using?

arnodirlam commented 3 years ago

Tried with

benwilson512 commented 3 years ago

Interesting. Dataloader.Ecto explicitly sets the caller value to the pid that calls load in order to avoid this issue. A test case which reproduces this would be very helpful if you have time open a PR with one.

arnodirlam commented 3 years ago

Turns out it just expects an :ok tuple, which we did not return. So this works:

Ecto.Multi.new()
|> Ecto.Multi.run(:whatever, fn _, _ ->
  loader =
    Dataloader.new()
    |> Dataloader.add_source(:repo, Dataloader.Ecto.new(MyApp.Repo))
    |> Dataloader.load(:repo, :roles, user)
    |> Dataloader.run()

  {:ok, loader}
end)
|> Ev2.Repo.transaction()

Thanks a lot for your time and help! 🙏 Closing this issue..

arnodirlam commented 3 years ago

We ran into issues again, now with a correct return value for Multi.run/3. Please see my updated PR branch.

[error] Task #PID<0.938.0> started from #PID<0.936.0> terminating
** (DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.931.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 999ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:593: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql 3.4.4) lib/ecto/adapters/sql.ex:526: Ecto.Adapters.SQL.execute/5
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:192: Ecto.Repo.Queryable.execute/4
    (ecto 3.4.5) lib/ecto/repo/queryable.ex:17: Ecto.Repo.Queryable.all/3
    (dataloader 1.0.8) lib/dataloader/ecto.ex:328: Dataloader.Ecto.run_batch/6
    (dataloader 1.0.8) lib/dataloader/ecto.ex:656: Dataloader.Source.Dataloader.Ecto.run_batch/2
    (dataloader 1.0.8) lib/dataloader/ecto.ex:614: anonymous fn/2 in Dataloader.Source.Dataloader.Ecto.run_batches/1
    (elixir 1.12.1) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
    (elixir 1.12.1) lib/task/supervised.ex:35: Task.Supervised.reply/5
    (stdlib 3.15.1) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<14.77029692/1 in Dataloader.Source.Dataloader.Ecto.run_batches/1>, [{{:queryable, #PID<0.931.0>, Dataloader.User, :one, :id, %{}}, #MapSet<[{490, 490}]>}]]

Grateful for any hints or pointers! 🙏

arnodirlam commented 3 years ago

@benwilson512 not sure if you've seen I re-opened this. I tried to fix it, but didn't get very far in my attempts.

Can you confirm that it's an issue in dataloader, or am I overlooking something again? 🙂

benwilson512 commented 3 years ago

@arnodirlam are you able to create an example I can run?

arnodirlam commented 3 years ago

@benwilson512 Yes, the tests in the linked PR should fail: #130

giddie commented 3 years ago

I'm seeing something similar when using a transaction. As far as I can tell, the issue is:

giddie commented 3 years ago

@arnodirlam I created #134 to solve my issue with Repo.transaction. I suspect this may solve your problem too.

arnodirlam commented 2 years ago

Thanks a lot for confirming the issue and for coming up with a PR, @giddie 🙏

I've done more research, and found out the following:

Why does dataloader use async processes?

Partly for error handling, partly for performance optimization.

The hierarchy of processes looks as follows:

  1. A process calls Dataloader
  2. A process is spawned for loading all sources (using Task.async + Task.await) [Dataloader.run/1]
  3. A process is spawned for each source (using Task.async_stream) [Dataloader.run_tasks/3]
  4. For each source, a process is spawned for loading all batches (using Task.async + Task.await) [Dataloader.Ecto.run/1]
  5. For each source, a process is spawned for each batch (using Task.async_stream) [Dataloader.Ecto.run_batches/1]

Spawning 2. and 4. is for catching errors raised while loading sources and batches, respectively. They trap exits (docs). Spawning 3. and 5. is for loading sources and batches in parallel. They are also used to enforce timeouts on loading each batch.

Why doesn't that work with transactions?

A transaction is bound to a DB connection that's held by the process calling Dataloader. DB connections, however, cannot be shared between processes.

I believe it is not about the sandbox. Transactions are per process, you cannot share a transaction across multiple process. If you have work you want to process in parallel, you can do all of the normalization on the side and leave the final step of inserting the data to the main process.

-- José Valim on Ecto mailing list (Mar 13, 2017)

How does Ecto itself handle this?

Ecto's preloader checks whether the process it's called from already has a checked out connection. If yes, it does not preload in parallel (using Task.async_stream) but synchronously:

    if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
         Keyword.get(opts, :in_parallel, true) do
      preloaders
      |> Task.async_stream(&(&1.(opts)), timeout: :infinity)
      |> Enum.map(fn {:ok, assoc} -> assoc end)
    else
      Enum.map(preloaders, &(&1.(opts)))
    end

-- shortened version of the code in Ecto.Repo.Preloader introduced by José Valim in 78ba871

How to fix the issue?

By restructuring Dataloader to not spawn processes if it's run from within a transaction. Instead of using Task.async for catching errors, we could wrap calls to the data-loading in try/rescue blocks. Would that cover all error cases?

In particular, I suggest the following changes (will update according to feedback):

I know it's quite a bit of refactoring, but I think it's worth it!

Looking forward to feedback and discussion! @giddie @benwilson512 😄

arnodirlam commented 2 years ago

Also @seddy who worked on the async parts a lot 👌

seddy commented 2 years ago

Don't think you meant me @arnodirlam, though I'm more than happy to take credit for someone else's work obviously :joy_cat:

EDIT: Unless you're referring to this thing I did ages ago and had completely forgotten about :sweat_smile: https://github.com/absinthe-graphql/dataloader/pull/41

arnodirlam commented 2 years ago

So this wasn't you? @seddy https://github.com/absinthe-graphql/dataloader/commit/ce9ac3f3e4649dd15a3094ad50ba841511455d31 🤔

EDIT: Yea, we're talking ages ago 😄

giddie commented 2 years ago

Great work @arnodirlam. Looks like you went a few steps further than me by delving into Ecto to find a better solution and writing it all up :) Your recommendations sound good to me.

benwilson512 commented 2 years ago

Handled now in 1.0.11. To use, make sure that when you call Dataloader.new/1 you do:

loader = Dataloader.new(async?: !MyApp.Repo.in_transaction?())

This will be handled automatically in 2.0