elixir-ecto / db_connection

Database connection behaviour
http://hexdocs.pm/db_connection/DBConnection.html
306 stars 113 forks source link

Introduce DBConnection.Stage #82

Closed fishcakez closed 7 years ago

fishcakez commented 7 years ago

Needs more tests but this is a first attempt at a streaming GenStage producer targeting Flow usage. Notice that there is flat_map option so we can preload in Ecto and currently only supports a single stream.

fishcakez commented 7 years ago

I have updated this to use a documented and generic GenStage implementation that should work in any part of a Flow pipeline. flat_map has been removed as Ecto could implement it's own implementation using Repo.Stream directly. Tests still need to be expanded.

josevalim commented 7 years ago

@fishcakez beautiful! :heart:

I don't see though how it can be used as a producer_consumer or consumer though. Is it somehow executing the query with the producer_consumer/consumer events?

fishcakez commented 7 years ago

A :consumer might be:

start = fn(conn) ->
  # prepare query/stream to use when handling events (init stage)
  copy_query = Postgrex.prepare!(conn, "copy", "COPY table FROM STDIN", [])
  Postgrex.stream(conn, copy_query, [])
end
## this is a consumer, so takes in events and returns `{[], state}`
handle  = fn(conn, events, copy_stream) ->
  {[], for event <- events, do: encode(event), into: copy_stream}
end
## close the query (undo start function)
stop = fn(conn, _reason, %DBConnection.Stream{query: copy_query}) ->
  Postgrex.close!(conn, copy_query)
end
DBConnection.Stage.start_link(pool, :consumer, start, handle, stop)

A producer_consumer, is less obvious, but might be something like:

start = fn(conn) ->
  # prepare query to use handling events (init stage)
  Postgrex.prepare!(conn, "insert", "INSERT INTO table (id, value) VALUES (DEFAULT, $1) RETURNING id", [])
end
## this is a producer_consumer, so takes in events and returns `{events, state}`
handle  = fn(conn, values, insert_query) ->
  ids = for value <- values do
    %{rows: [id]} = Postgrex.execute!(conn, insert_query, [value])
    id
  end
  {ids, insert_query}
end
## close the query (undo start function)
stop = fn(conn, _reason, insert_query) ->
  Postgrex.close!(conn, insert_query)
end
DBConnection.Stage.start_link(pool, :producer_consumer, start, handle, stop)

start, handle and stop are equivalent to nested DBConnection.transaction calls, so can do anything that can be done in a transaction and return the result.

josevalim commented 7 years ago

@fishcakez perfect, thank you!

fishcakez commented 7 years ago

When having a "continuation" style transaction we want to be able to not raise when transaction(conn, fun) is called when the connection closed/rolled back so that terminate/2 or after fun in Stream.resource/3 won't crash if an exception was previously raised. This PR gets around this by returning {:error, :rollback} if the connection is no longer available. However to be consistent it would mean nested transactions would return {:error, :rollback} instead of raising if the connection is closed or transaction is rolling back.

josevalim commented 7 years ago

Does it mean we need to change Repo.transaction in Ecto or can we still raise there?

If we can't raise still, would it be a breaking change since we only return {:error, :rollback} if the user calls Repo.rollback? Although I guess anything could call Repo.rollback and return {:error, :rollback} so I would say that's ok? --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D

fishcakez commented 7 years ago

Does it mean we need to change Repo.transaction in Ecto or can we still raise there?

We wouldn't be able to change it there because we can't tell why {:error, :rollback} occurred. Currently it can occur if:

If we can't raise still, would it be a breaking change since we only return {:error, :rollback} if the user calls Repo.rollback? Although I guess anything could call Repo.rollback and return {:error, :rollback} so I would say that's ok?

While it is not the only way I think it is a breaking change because users may be rescuing the DBConnection.ConnectionError (current behaviour), and not handling {:error, :rollback}.

We can resolve this by requiring a continue/2,3 call at the top level that returns {:ok, _} | {:error, _} | :closed when using the "continuation" transaction, wdyt?

fishcakez commented 7 years ago

We can resolve this by requiring a continue/2,3 call at the top level that returns {:ok, } | {:error, } | :closed when using the "continuation" transaction, wdyt?

I think we actually need to go with requiring a continue function so we can force check in on exceptions, otherwise we may end up in situations where we close connections because the caller crashes or does not checkin.

fishcakez commented 7 years ago

Closed in favour of #87 so this can be implemented in Ecto itself.