dwyl / alog

🌲 alog (Append-only Log) is an easy way to start using the Lambda/Kappa architecture in your Elixir/Phoenix Apps while still using PostgreSQL (with Ecto).
GNU General Public License v2.0
15 stars 2 forks source link

All - Ecto.Adapters.SQL.Connection #40

Open Danwhy opened 5 years ago

Danwhy commented 5 years ago

Part of #38

https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Connection.html#c:all/1

We need to implement this callback so that it returns the latest version of all rows that match the query parameter.

@RobStallion has already done some research on looking into how we can modify the passed in query, which we can then pass on to the Ecto.Adapters.Postgres.Connection.all function

https://github.com/RobStallion/alog_adapter/issues/1 https://stackoverflow.com/questions/54690701/is-there-a-way-to-ensure-where-clause-happens-after-distinct

RobStallion commented 5 years ago

@Danwhy I just got the following error when trying to run a rollback command on the module where I have been testing this dummy adapter...

** (Postgrex.Error) ERROR 42703 (undefined_column) column s0.comment_id_no does not exist

    query: SELECT DISTINCT ON (s0."comment_id_no") s0."version"::bigint FROM "schema_migrations" AS s0 FOR UPDATE
    (ecto_sql) lib/ecto/adapters/sql.ex:624: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:557: Ecto.Adapters.SQL.execute/5
    (ecto) lib/ecto/repo/queryable.ex:147: Ecto.Repo.Queryable.execute/4
    (ecto) lib/ecto/repo/queryable.ex:18: Ecto.Repo.Queryable.all/3
    (ecto_sql) lib/ecto/migrator.ex:316: anonymous fn/3 in Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/ecto/adapters/sql.ex:820: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection) lib/db_connection.ex:1355: DBConnection.run_transaction/4
    (ecto_sql) lib/ecto/adapters/sql.ex:727: Ecto.Adapters.SQL.lock_for_migrations/5
    (ecto_sql) lib/ecto/migrator.ex:318: Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/mix/tasks/ecto.rollback.ex:106: anonymous fn/4 in Mix.Tasks.Ecto.Rollback.run/2
    (elixir) lib/enum.ex:765: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:765: Enum.each/2
    (mix) lib/mix/task.ex:316: Mix.Task.run_task/3
    (mix) lib/mix/cli.ex:79: Mix.CLI.run_task/2
    (elixir) lib/code.ex:767: Code.require_file/2

This is the line that is causing the issue - https://github.com/RobStallion/alog_adapter/blob/master/lib/connection.ex#L36

This appears to be because not all the tables created in the module contain the row "comment_id_no" (equivalent to entry_id).

This could be a potential issue in trying to create an ecto adapter.

Danwhy commented 5 years ago

@RobStallion Yeah I'd assume that the problem is that the schema_migrations table is automatically created. But we can have full control over all tables being created, so we can either make sure the "entry_id" is on all created tables, or have a clause that catches the exceptions.

Danwhy commented 5 years ago

I've been having a look into the flow of how the Repo.all query is called:

Repo.all -> Ecto.Repo.Queryable.all -> Ecto.Repo.Queryable.execute -> Ecto.Query.Planner.query (Ecto.Query.Planner.plan) -> adapter.execute (Ecto.Adapters.SQL.execute) -> adapter.execute! -> sql_call

It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function

SimonLab commented 5 years ago

I've been doing something similar yesterday to see how the all function was created. I was a bit confuse by the execute function as the adapter has also one but with only 4 parameters and I figure out later one that it was the one used by Ecto.Adapters.SQL which is defined by 5 parameters. see also https://github.com/dwyl/learn-elixir/issues/116 to see what is a best way to have a trace of the function call

The sources looks to be used (but not defined there) in Ecto.Adapters.SQL.execute function with put_source:

  def execute(adapter_meta, query_meta, prepared, params, opts) do
    %{num_rows: num, rows: rows} =
      execute!(adapter_meta, prepared, params, put_source(opts, query_meta))

    {num, rows}
end

https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L554-L560

Danwhy commented 5 years ago

I think by the time it gets to that function, our Adapter all function has already been called, that's just where the sql it generates is actually executed

SimonLab commented 5 years ago

Describing the trace call for the Repo.all function to find out where exactly the all function from our adapter will be called and with which parameter structure

This function is defined in Ecto.Repo module on line 237: https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/repo.ex#L237-L239

 def all(queryable, opts \\ []) do
          Ecto.Repo.Queryable.all(__MODULE__, queryable, opts)
end
  defp execute(operation, name, query, opts) when is_list(opts) do
    {adapter, %{cache: cache} = adapter_meta} = Ecto.Repo.Registry.lookup(name)
    {query_meta, prepared, params} = Planner.query(query, operation, cache, adapter, 0)

    case query_meta do
      %{select: nil} ->
        adapter.execute(adapter_meta, query_meta, prepared, params, opts)
      %{select: select, sources: sources, preloads: preloads} ->
        %{
          preprocess: preprocess,
          postprocess: postprocess,
          take: take,
          assocs: assocs,
          from: from
        } = select

        preprocessor = preprocessor(from, preprocess, adapter)
        {count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)
        postprocessor = postprocessor(from, postprocess, take, adapter)

        {count,
          rows
          |> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
          |> Ecto.Repo.Preloader.query(name, preloads, take, postprocessor, opts)}
    end
end
  def query(query, operation, cache, adapter, counter) do
    {query, params, key} = plan(query, operation, adapter, counter)
    query_with_cache(key, query, operation, cache, adapter, counter, params)
end

The Ecto.Query.Planner.query function is calling Ecto.Query.Planner.plan function https://github.com/elixir-ecto/ecto/blob/d2bca3d36476cc92d2e761ab2d99c130c9ad83d5/lib/ecto/query/planner.ex#L206

  @doc """
  Prepares the query for cache.
  This means all the parameters from query expressions are
  merged into a single value and their entries are pruned
  from the query.
  This function is called by the backend before invoking
  any cache mechanism.
  """
  def plan(query, operation, adapter, counter) do
    query
    |> plan_sources(adapter)
    |> plan_assocs
    |> plan_combinations(operation, adapter, counter)
    |> plan_cache(operation, adapter, counter)
  rescue
    e ->
      # Reraise errors so we ignore the planner inner stacktrace
      filter_and_reraise e, System.stacktrace
end

So I think the plan function is just preparing the query.

then query_with_cache(key, query, operation, cache, adapter, counter, params) is called:

  defp query_with_cache(key, query, operation, cache, adapter, counter, params) do
    case query_lookup(key, query, operation, cache, adapter, counter) do
      {_, select, prepared} ->
        {build_meta(query, select), {:nocache, prepared}, params}
      {_key, :cached, select, cached} ->
        update = &cache_update(cache, key, &1)
        reset = &cache_reset(cache, key, &1)
        {build_meta(query, select), {:cached, update, reset, cached}, params}
      {_key, :cache, select, prepared} ->
        update = &cache_update(cache, key, &1)
        {build_meta(query, select), {:cache, update, prepared}, params}
    end
end

which calls query_lookup(key, query, operation, cache, adapter, counter). This function looks if the query already exists in the ets cache otherwise it will prepare the query

  defp query_lookup(key, query, operation, cache, adapter, counter) do
    case :ets.lookup(cache, key) do
      [term] -> term
      [] -> query_prepare(query, operation, adapter, counter, cache, key)
    end
end

Then query_prepare(query, operation, adapter, counter, cache, key):

  defp query_prepare(query, operation, adapter, counter, cache, key) do
    case query_without_cache(query, operation, adapter, counter) do
      {:cache, select, prepared} ->
        cache_insert(cache, key, {key, :cache, select, prepared})
      {:nocache, _, _} = nocache ->
        nocache
    end
end

Which runs query_without_cache(query, operation, adapter, counter):


  defp query_without_cache(query, operation, adapter, counter) do
    {query, select} = normalize(query, operation, adapter, counter)
    {cache, prepared} = adapter.prepare(operation, query)
    {cache, select, prepared}
end

And we can see that after the query has been normalised the adapter.prepare(operation, query) function is called. This function is defined in Ecto.Adapters.SQL https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L134

 def prepare(:all, query) do
        {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
end

we can see that @conn.all(query) is called and conn is defined as @conn __MODULE__.Connection which is our Connection adapter module!

I think I manage to see the lifecycle of the Repo.all function. There are other helpers functions that are called but by doing this trace I understood that the query is first "prepared" then checked if it already exists in ets and if not then the Adapter will be called and the correct function will be triggered. Then postgrex will have the responsability to send the query directly to Postgres and to retrieve the result. Now that we have a better idea where the query is transformed we can tried to copy or adapt some helper functions above to add some sql clause for adding the alog logic (uuid unique, latest timestamp,..)

SimonLab commented 5 years ago

Trying to recreate a similar subquery to the one currently on alog:

    sub = from (m in subquery(a1)), distinct: m.last_name, order_by: m.first_name, select: m
    query = from a in subquery(sub), where: not a.deleted, select: a
    Ecto.Adapters.Postgres.Connection.all(query)

where a1 is the query the adapter all function is getting

image

The create_names function here https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/postgres/connection.ex#L654

    defp create_names(%{sources: sources}) do
      create_names(sources, 0, tuple_size(sources)) |> List.to_tuple()
end

The error is from tuple_size(sources) where it seems that sources is nil I think that the subquery function returns a struct query which is different that the one after the initial steps of the adapter which makes them incompatible

It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function

Looking if we can reuse this function

SimonLab commented 5 years ago

Trying to apply plan or plan_sources from Ecto.Query.Planner returns other errors. I will instead

RobStallion commented 5 years ago

Looking at @SimonLab 's comment, the following line appears to be where the adapter is called...

{count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)

This calls the following function in the adapter...

def execute(adapter_meta, query_meta, query, params, opts) do
  Ecto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts)
end

This function is defined here.

RobStallion commented 5 years ago

The arguments passed to this function are...

[
  adapter_meta: %{
    cache: #Reference<0.3974704928.2992242689.138205>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.2848.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {134695,
    "SELECT c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]

To clarify, the original call was

Repo.all(Comments)
RobStallion commented 5 years ago

Next step

I am going to pass in a subquery to Repo.all and compare the two sets or arguments that are passed to the adapters execute/5 function.

RobStallion commented 5 years ago
[
  adapter_meta: %{
    cache: #Reference<0.872129942.2461401090.223629>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.329.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {103,
    "SELECT DISTINCT ON (c0.\"cid\") c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0 WHERE (c0.\"comment_id_no\" = '1') ORDER BY c0.\"cid\", c0.\"inserted_at\" DESC"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]

These are the arguments passed to the adapters execute/5 function when all is called with the following subquery...

sub =
  from(c in Comments,
  distinct: c.cid,
  order_by: [desc: :inserted_at]
)

query = from(c in sub, where: c.comment_id_no == "1")
Repo.all(query)
RobStallion commented 5 years ago

At first glance the arguments passed to our adapter look almost identical, with the OBVIOUS exception of the :prepared atom.

:prepared is the same as :query I just logged the arguments from the function that Ecto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts) calls. So I logged def execute(adapter_meta, query_meta, prepared, params, opts) this line. This is the only reason the function says :query but the log says :prepared.

RobStallion commented 5 years ago

Hopefully this means that if we can somehow update the query that is passed to the adapter, to add subquery to it.

Will look into possible approaches for this.

SimonLab commented 5 years ago

My latest attempt was to try to reproduce the logic of the all function from the Postgres adapter:

    def all(query) do
      sources = create_names(query)
      {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query)

      from = from(query, sources)
      select = select(query, select_distinct, sources)
      join = join(query, sources)
      where = where(query, sources)
      group_by = group_by(query, sources)
      having = having(query, sources)
      window = window(query, sources)
      combinations = combinations(query)
      order_by = order_by(query, order_by_distinct, sources)
      limit = limit(query, sources)
      offset = offset(query, sources)
      lock = lock(query.lock)

      [select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock]
end

This function returns an improper list of improper list of string. I'm not certain why exactly improper lists are used here but I think this might be for optimisation and to make pattern matching easier.

see https://hexdocs.pm/elixir/master/List.html "Some lists, called improper lists, do not have an empty list as the second element in the last cons cell"

Then I've looked at the private function from to understand a bit the structure of the returned value of all:

    defp from(%{from: %{source: source}} = query, sources) do
      {from, name} = get_source(query, sources, 0, source)
      [" FROM ", from, " AS " | name]
end

So it looks like it create an improper list which is similar to the from part of an sql query. My idea is to create/transform one of the improper list to be able to add a subquery. I haven't manage to get to this point yet. An idea would be to IO.inspect in the all function of the normal Postgres adapter a query containing a subquery to try to understand how it's formed. Then try to reproduce the logic with our specific alog subquery.

Then try to repeat this process for the distinct part of the alog query.

RobStallion commented 5 years ago

The code that will enable access to the all query can be found here.

At the moment I am unsure if this function is only called when Repo.all is called or if it is called for other Repo functions as well.

SimonLab commented 5 years ago

Running the tests will run the migrations which will run Repo.all. At the moment we are adding distinct on on all the queries but I don't think it is needed for the migration: image

To be able to get all the different parts of the query (ie table name, fields, table_as...) I was using String.split but the code become tedious and not really readable, I'm going to look at how to replace it with regex. This will allow me to get the table name and create a switch to know if Repo.all is run on a migration or on a user query

SimonLab commented 5 years ago

Given a query similar to: we can retrieve the different part of the query with the following regex:

Regex.named_captures(~r/(\bSELECT\b)\s(?<fields>.*)\sFROM\s(?<table_name>.*)\sas\s(?<table_as>.*)(?<rest_query>.*)/i, query)
SimonLab commented 5 years ago

I think the alog all query is now ready: image

I've recreated the following structure for the query:

# SELECT
#   s0."id",
#   s0."name",
#   s0."entry_id",
#   s0."deleted",
#   s0."inserted_at",
#   s0."updated_at"
# FROM
#   (SELECT DISTINCT ON (d0."entry_id")
#       d0."id" AS "id"
#     , d0."name" AS "name"
#     , d0."entry_id" AS "entry_id"
#     , d0."deleted" AS "deleted"
#     , d0."inserted_at" AS "inserted_at"
#     , d0."updated_at" AS "updated_at"
#   FROM "drink_types" AS d0
#   ORDER BY d0."entry_id", d0."updated_at" DESC)
# AS s0 WHERE (NOT (s0."deleted"))

I will need to write more tests to see if any error occurs.