dwyl / ecto-postgres-pubsub-spike

πŸ‘‚A Spike to attempt to create a history for a table using PostgreSQL Pub/Sub
2 stars 0 forks source link

SPIKE: Automatically insert records in address_history table #1

Open nelsonic opened 5 years ago

nelsonic commented 5 years ago

With reference to the Address Book example outlined in https://github.com/dwyl/phoenix-ecto-append-only-log-example/issues/27 we want to pursue Option 3 which is having an address_history table where all address records get backed up when they are inserted or updated.

Spike Tasks βœ…

Basic πŸ‘

id name address_line_1 address_line_2 city postcode tel inserted_at updated_at
1 Thor 1 Sunset Blvd 3rd Floor LA 90210 98765 2019-03-28 22:01:42 2019-03-28 22:01:42

address_history would be:

id ref_id name address_line_1 address_line_2 city postcode tel inserted_at
1 1 Thor The Hall Valhalla Asgard AS1 3DG 123123 2019-02-25 10:01:42
2 1 Thor 177A Bleecker Street c/o Dr. Strange New York NY 10012 98765 2019-03-14 10:01:42
3 1 Thor 1 Sunset Blvd 3rd Floor LA 90210 98765 2019-03-28 22:01:42

Advanced πŸš€

Bonus Level! πŸ₯‡

Boss Level πŸ‘Ύ

The purpose of this level/section is to automatically update the address_history table whenever anything is updated in the address (primary) table. Nobody has time/patience to manually update both tables and keep them in sync, so we need to figure out if we can automate it without any "magic".

defmodule Append.Address do
  use Ecto.Schema

  @timestamps_opts [type: :naive_datetime_usec]
  schema "addresses" do
    field(:address_line_1, :string)
    field(:address_line_2, :string)
    field(:city, :string)
    field(:name, :string)
    field(:postcode, :string)
    field(:tel, :string)

    timestamps()
    hello()
  end

 def hello do
   IO.inspect("hello from address schema definition")
 end
end

@RobStallion this SPIKE should get us towards our mission of making alog more user-friendly. Please log your progress on this and if you get stuck please share as much detail as possible so that we can help "unblock" ... Thanks! 🌻

RobStallion commented 5 years ago

Following the steps outlined above

mix phx.gen.schema Address addresses name:string address_line_1:string address_line_2:string city:string postcode:string tel:string

should be

mix phx.gen.schema AddressHistory addresses_history name:string address_line_1:string address_line_2:string city:string postcode:string tel:string

(or address_histories depending on preference) Also had to add the following to the migration file...

defmodule App.Repo.Migrations.CreateAddressesHistory do
  use Ecto.Migration

  def change do
    create table(:addresses_history) do
      add :ref_id, references(:addresses, on_delete: :delete_all, column: :id, type: :id) # <----------- this line
...
end

and this to the schema file...

defmodule App.AddressHistory do
  use Ecto.Schema
  import Ecto.Changeset

  schema "addresses_history" do
    field :ref_id, :integer

Will look to see if we can update the gen schema command to do this for us. I'm sure that this is possible but wasn't a blocker for now.

These points are handled by autogenerated tests in app/test/app_web/controllers/address_controller_test.exs

  • [ ] Manually insert a record into the address table using the browser-based form.
    • [ ] Confirm the record was inserted. βœ…

This can be confirmed with mix test

RobStallion commented 5 years ago

Completing the 'basic' steps went fairly well. The phx.gen.html command in the first comment creates an accounts context with a list of default functions.

The two I was most interested in at point are...

  def create_address(attrs \\ %{}) do
    %Address{}
    |> Address.changeset(attrs)
    |> Repo.insert()
  end

  def update_address(%Address{} = address, attrs) do
    address
    |> Address.changeset(attrs)
    |> Repo.update()
  end

As the names of these functions suggest, they handle creating an address and updating an address. The relevant controller actions call these functions to handle these steps (Address.Controller.Create & Address.Controller.update)

  • [ ] Write a custom function that inserts the data into address_history each time a record is inserted into address. insert_address_history() needs to know the id of the address record so it needs to happen after the Repo.insert()

To achieve this point I created a function in the accounts context, App.Accounts, called create_address_history/1

  defp create_address_history(address) do
     params = address |> Map.from_struct() |> Map.put(:ref_id, address.id)

     %AddressHistory{}
     |> AddressHistory.changeset(params)
     |> Repo.insert!()
  end

This function takes an address struct, converts it to a map and then adds the ref_id (needed for the history table) to the map. It then uses these as params to insert into the addresses_history table.

I then updated the create_address to use the newly created function like so...

  def create_address(attrs \\ %{}) do
    with changeset <- Address.changeset(%Address{}, attrs),
         {:ok, address} <- Repo.insert(changeset),
         _address_history <- create_address_history(address)
    do
      {:ok, address}
    end
  end

The with statement in the function ensures that the Address was inserted corrected with this line...

{:ok, address} <- Repo.insert(changeset),

If the address is inserted without issue, then the function inserts a row into the address history table with

_address_history <- create_address_history(address)

This seemed to work well so I implemented pretty much the same change to the update_address function.

  def update_address(%Address{} = address, attrs) do
    with changeset <- Address.changeset(address, attrs),
         {:ok, address} <- Repo.update(changeset),
         _address_history <- create_address_history(address)
    do
      {:ok, address}
    end
  end

Again, this checks if the address update was okay, if so then inserts a new row into the history table with the function we make earlier, create_address_history.

These changes also meant that no other files had to be changed. As the controllers call the functions in the accounts context and the account context functions are now handling the history table inserts no other code needs to be updated (except some new tests which were added to make sure that these functions work as expected)

RobStallion commented 5 years ago

Following the steps outlined in advanced

Create a PostgreSQL Pub/Sub LISTEN (Elixir) function for the insert and update events on the address table and log this event to std.out

Going to read this article to see if we can use this in our application.

RobStallion commented 5 years ago

I have been trying to follow the steps found here https://blog.lelonek.me/listen-and-notify-postgresql-commands-in-elixir-187c49597851.

I am not currently getting any errors but it also does not appear to be doing anything.

Looking into what could be going wrong/ what is different with the code I have.

RobStallion commented 5 years ago

It looks like the handle_info/2 function that I defined (copied from the link above) is not currently being called. I think that this may be because the pattern is not currently matching...

  def handle_info({:notification, _pid, _ref, "accounts_changed", payload}, _state) do
    with {:ok, data} <- Jason.decode(payload, keys: :atoms) do
      data
      |> inspect()
      |> Logger.info()

      {:noreply, :event_handled}
    else
      error -> {:stop, error, []}
    end
  end

There is a chance that this is not being called because the params are not matching the function when it is being called.

Going to define a more generic version of this function to see if it is being called.

RobStallion commented 5 years ago
  def handle_info(arg1, state) do
    IO.inspect(arg1)
    IO.inspect(state)
  end

The generic version of the function do not appear to be being called either. Need to look into the GenServer module to get a better understanding of how handle_info is meant to be called.

RobStallion commented 5 years ago

I cannot see anything obvious wrong with how handle_info is being defined.

I am going to instead look into Stored Procedures and Functions in PostgreSQL. This was the part of the tutorial that I felt least confident in so going to do some extra reading around the subject. Starting with this link

RobStallion commented 5 years ago

Reading this link is came across the following...

# Search function migration
def change do
  execute """
    CREATE OR REPLACE FUNCTION article_search(term varchar)
    RETURNS table(id int, title varchar, slug varchar, display_date date, tags varchar[], rank real, introduction text)
    AS $$
      SELECT ar.id, ar.title, ar.slug, ar.display_date, ar.tags, 
        ts_rank_cd(search_vector, search_query, 32) AS rank, 
        ts_headline('english',
          CONCAT(introduction,' ',main_body), 
          search_query, 
          'StartSel=<mark>,StopSel=</mark>,MinWords=50,MaxWords=100'
        ) AS introduction
      FROM articles ar
        INNER JOIN (
          SELECT 
            ts_rewrite(
              plainto_tsquery(term), 
              CONCAT('SELECT * FROM aliases WHERE plainto_tsquery(''',term,''') @> t')
            ) search_query
        ) sq ON ar.search_vector @@ search_query
      WHERE ar.publish_at IS NOT NULL AND ar.static = FALSE
      ORDER BY rank DESC;
  $$ language SQL;
  """
end

This appears to be a CREATE OR REPLACE FUNCTION being called in an elixir migration. If this is the case then we should be able to implement the following...(taken from the first link on pub/sub in postgres)

CREATE OR REPLACE FUNCTION notify_account_changes()
RETURNS trigger AS $$
BEGIN
  PERFORM pg_notify(
    'accounts_changed',
    json_build_object(
      'operation', TG_OP,
      'record', row_to_json(NEW)
    )::text
  );

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

&&


CREATE TRIGGER accounts_changed
AFTER INSERT OR UPDATE
ON accounts
FOR EACH ROW
EXECUTE PROCEDURE notify_account_changes()

in our migration files. The first code block should define a function and the second should define a trigger that is executed after insert or update

nelsonic commented 5 years ago

@RobStallion looking great so far! πŸŽ‰ We can skip the step of cascading DELETE ops through the address_history table because we don't want an accidental DELETE to destroy all the history. πŸ€”

This trigger is looking good. Keen to catch up in the morning. πŸ‘

RobStallion commented 5 years ago

The update function doesn't have to be update the history table after the original table has been updated as the id to update will already be known.

I don't know if it will make any difference in how efficient things are if we "batch" the calls but just wanted to capture the thought somewhere.

nelsonic commented 5 years ago

@RobStallion great question. Batching will improve perf from a PostgreSQL scalability perspective for sure, but it will also mean the user has to wait slightly longer for their response. I strongly prefer the batch (or TRIGGER to be more precise) approach because it means the insert/update op is a transaction and there is less likelihood of a failure to store the history.

RobStallion commented 5 years ago

Have been able to get stored procedure working following what was mentioned in this comment.

I haven't tested it much at this point but so far I have been able to call Repo.insert and log the results of the insert in the terminal.

handle info...

  def handle_info({:notification, _pid, _ref, "accounts_changed", payload}, _state) do
    with {:ok, data} <- Jason.decode(payload, keys: :atoms) do
      IO.inspect(data)
      {:noreply, :event_handled}
    else
      error ->
        {:stop, error, []}
    end
  end

In iex I am calling the following...

App.Repo.insert(%App.Accounts.Account{username: "test"})

The log in the terminal

%{operation: "INSERT", record: %{id: 4, username: "test"}}

Still need to look into this a little more to better understand it and see how it will handle other situations, like invalid data attempting to be inserted but so far so good.

With this step we should be able to replace the function call being used here

RobStallion commented 5 years ago

Have removed the call to create_address_history from create_address and update_address. This caused the 2 tests I created to fail but all other tests pass.

Going to attempt to replace the create_address_history function using the pubsub method.

RobStallion commented 5 years ago

I have been able to create insert into the addresses_history table when inserting into the addresses table, however the tests are still failing.

  1) test addresses update_address/2 with valid data inserts new address into address history (App.AccountsTest)
     test/app/accounts/accounts_test.exs:56
     Assertion with == failed
     code:  assert length(Repo.all(AddressHistory)) == 2
     left:  0
     right: 2
     stacktrace:
       test/app/accounts/accounts_test.exs:63: (test)

  2) test addresses create_address/1 with valid data also creates address_history (App.AccountsTest)
     test/app/accounts/accounts_test.exs:38
     Expected false or nil, got true
     code: refute is_nil(address_history)
     stacktrace:
       test/app/accounts/accounts_test.exs:42: (test)

The logs make it look like the history table is not being inserted into which is not what I am seeing when I am testing manually. Looking into what could be causing this

RobStallion commented 5 years ago

Little confused as to why this the tests are not inserting data into the history table. Going to push up the code with failing tests for now.

https://github.com/dwyl/ecto-postgres-pubsub-spike/pull/5

nelsonic commented 5 years ago

@RobStallion you should have admin rights to add Travis-CI to the repo. (if unable, please ping on Gitter)

RobStallion commented 5 years ago

In the initial comment's boss section it mentions having code that updates the history table when the original is updated.

I think that we should also look into a way of allowing users to create the history table and history schema "automatically". Looking into possible ways that this can be done.

RobStallion commented 5 years ago

So far I have a migration file that looks like this...

  def change do
    create table(:tests) do
      add :a, :string
    end

    flush()

    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
  end

This works fine when I run

mix ecto.migrate
[info] == Running 20190502162628 App.Repo.Migrations.CreateTestTables.change/0 forward
[info] create table tests
[info] create table tests_history
[info] == Migrated 20190502162628 in 0.0s

However, it is throwing an error when I try to rollback

mix ecto.rollback
➜  app git:(auto-create-history-table) βœ— mix ecto.rollback
[info] == Running 20190502162628 App.Repo.Migrations.CreateTestTables.change/0 backward
[info] drop table tests
** (Postgrex.Error) ERROR 2BP01 (dependent_objects_still_exist) cannot drop table tests because other objects depend on it

    hint: Use DROP ... CASCADE to drop the dependent objects too.

constraint tests_history_ref_id_fkey on table tests_history depends on table tests

I thought that flush/0 ran the commands inside of the change function in reverse when doing a rollback.

Screenshot of the docs... image

Going to look into why this didn't work (maybe for a pomodoro). If I cannot find a solution I will look for a different approach (like the hint that was logged in the terminal)

RobStallion commented 5 years ago

Wanted to confirm my thoughts on how the migration worked. I temp replaced the change function with up and down functions...

  def up do
    create table(:tests) do
      add :a, :string
    end

    flush()

    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
  end

  def down do
    drop table(:tests_history)
    drop table(:tests)
  end

I can run migrations and rollbacks with this without any issue. I thought that flush/0 essentially did this in the change function.

Looking into this some more

RobStallion commented 5 years ago

This seems to work just fine when running migrate and rollback....

  def change do
    create table(:tests) do
      add :a, :string
    end

    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
  end

I think that I am misunderstanding how flush/0 works.

RobStallion commented 5 years ago
  def change do
    flush()
    create table(:tests) do
      add :a, :string
    end

    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
  end
  def change do
    create table(:tests) do
      add :a, :string
    end

    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
    flush()
  end

both of these also work. I'm not sure when/how exactly flush/0 should be used but I am going to leave it out for the time being as it everything seems to be working as expected.

RobStallion commented 5 years ago

Now that I have the one migration file reliably creating both tables I want to see if we can extract the code to create the history table into a function and call that function in the change/0 function

defmodule App.Repo.Migrations.CreateTestTables do
  use Ecto.Migration

  def change do
    create table(:tests) do
      add :a, :string
    end

    create_history()
  end

  def create_history() do
    create table(:tests_history) do
      add :ref_id, references(:tests, column: :id, type: :id, on_delete: :nothing)
      add :a, :string
    end
  end
end
RobStallion commented 5 years ago

This works πŸŽ‰. This means that we should be able to inject code into the change function to create our history table.

Before I move onto this step I want to see if there is some way for us to access the fields that have been used to create the original table, e.g. :a

add :a, :string

(⬆️these fields)

If there is, then we should be able to create automatically add them to our create_history function.

Looking into this now

RobStallion commented 5 years ago

Logging the results of create table(:tests) when calling mix ecto.migrate

    create table(:tests) do
      add :a, :string
    end
    |> IO.inspect(label: "===> ")
===> : %Ecto.Migration.Table{
  comment: nil,
  engine: nil,
  name: "tests",
  options: nil,
  prefix: nil,
  primary_key: true
}

Unfortunately, this does not contain the keys being added to the table. Need to look into other possible ways of getting the info

RobStallion commented 5 years ago

Have a somewhat hacky feeling solution...

  def change do
    create table(:tests) do
      add :a, :string
    end

    create_history(:tests, [:a])
  end

  def create_history(table_name, list) do
    history_table =
      table_name
      |> Atom.to_string()
      |> Kernel.<>("_history")
      |> String.to_atom()

    create table(history_table) do
      add :ref_id, references(table_name, column: :id, type: :id, on_delete: :nothing)
      for column <- list, do: add column, :string
    end
  end

This is not a "final solution" but it shows that we can at least create a function that makes it fairly straightforward to create a history table and add fields to it.

Ideally we want the user to only have to type create_history() and the table is created with all the fields but this is a start.

RobStallion commented 5 years ago

Small refactor which would allow a user to pipe the create table call into the create_history function...

  def change do
    create table(:tests) do
      add :a, :string
    end
    |> create_history([:a])
  end

  def create_history(migration_table, list) do
    history_table =
      migration_table.name
      |> Kernel.<>("_history")
      |> String.to_atom()

    create table(history_table) do
      add :ref_id, references(migration_table.name, column: :id, type: :id, on_delete: :nothing)
      for column <- list, do: add column, :string
    end
  end
RobStallion commented 5 years ago

In order for us to gain access to the what is being done to the table (e.g. add :a, :string) we will need to recreate/edit this module. I am not currently sure if this is worthwhile. It could end up adding a lot more complexity without actually helping to reduce the code that a user would have to write.

The create macro is defined as follows...

  defmacro create(object, do: block) do
    expand_create(object, :create, block)
  end

  defp expand_create(object, command, block) do
    quote do
      table = %Table{} = unquote(object)
      Runner.start_command({unquote(command), Ecto.Migration.__prefix__(table)})

      if table.primary_key do
        opts = Runner.repo_config(:migration_primary_key, [])
        opts = Keyword.put(opts, :primary_key, true)

        {name, opts} = Keyword.pop(opts, :name, :id)
        {type, opts} = Keyword.pop(opts, :type, :bigserial)

        add(name, type, opts)
      end

      unquote(block)
      Runner.end_command
      table
    end
  end

log of object and block from defmacro create

{:table, [line: 42], [:tests]}
{:add, [line: 43], [:a, :string]}

object is the table name passed in, :tests in our case, and block is everything after the do.

RobStallion commented 5 years ago

I don't really understand what is going on with the Runner.start_command and Runner.end_command. The block, which contains all the columns to add is only used just before the Runner.end_command call and doesn't appear to do much, yet somehow the db knows to add these columns to the table.

Need to look into this in more detail

RobStallion commented 5 years ago

My understanding so far is that our migration file calls the create macro in Ecto.Migration...

Simple migration file with create table being called...

defmodule App.Repo.Migrations.CreateTestTables do
  use Ecto.Migration

  def change do
    create table(:tests) do
      add :a, :string
    end
  end
end

Calls...

defmodule Ecto.Migration do
  ...

  defmacro create(object, do: block) do
    expand_create(object, :create, block)
  end

  ...
end

which in turn calls...

  defp expand_create(object, command, block) do
    quote do
      table = %Table{} = unquote(object)
      Runner.start_command({unquote(command), Ecto.Migration.__prefix__(table)})

      if table.primary_key do
        opts = Runner.repo_config(:migration_primary_key, [])
        opts = Keyword.put(opts, :primary_key, true)

        {name, opts} = Keyword.pop(opts, :name, :id)
        {type, opts} = Keyword.pop(opts, :type, :bigserial)

        add(name, type, opts)
      end

      unquote(block)
      Runner.end_command
      table
    end
  end

This functions arguments are as follows...

[
  block: {:add, [line: 6], [:a, :string]},
  command: :create,
  object: {:table, [line: 5], [:tests]}
]

Unquoted block creates the table variable...

  defp expand_create(object, command, block) do
    quote do
      table = %Table{} = unquote(object)
      ...
  end

This is what the table variable looks like...

%Ecto.Migration.Table{
  comment: nil,
  engine: nil,
  name: "tests",
  options: nil,
  prefix: nil,
  primary_key: true
}

Runner.start_command is then called with a tuple of :create and the table variable.

Looking at the Ecto.Migration.Runner module, there are comments at the top of the file that say the following...

defmodule Ecto.Migration.Runner do
  # A GenServer responsible for running migrations
  # in either `:forward` or `:backward` directions.

This seems to be where the adding of fields, etc is actually happening then.

The start_command function is defined as follows...

  @doc """
  Starts a command.
  """
  def start_command(command) do
    reply =
      Agent.get_and_update(runner(), fn
        %{command: nil} = state ->
          {:ok, %{state | command: command}}
        %{command: _} = state ->
          {:error, %{state | command: command}}
      end)

    case reply do
      :ok ->
        :ok
      :error ->
        raise Ecto.MigrationError, "cannot execute nested commands"
    end
  end

(pretty unhelpful doc 😞)

I don't know anywhere near enough about Agents (or GenServers for that matter) but from what I understand they are a server that allows different processes to view and update state.

Screenshot from the Agent docs... image

The command seems to manage the bit of state that says what the migration is doing. If you log this line...

{:ok, %{state | command: command}}

from the function, you will see something like this...

{:ok,
%{
   command: {:create,
    %Ecto.Migration.Table{
      comment: nil,
      engine: nil,
      name: "tests",
      options: nil,
      prefix: nil,
      primary_key: true
    }},
   commands: [],
   config: [
     telemetry_prefix: [:app, :repo],
     otp_app: :app,
     timeout: 15000,
     username: "postgres",
     password: "postgres",
     database: "app_dev",
     hostname: "localhost",
     pool_size: 10
   ],
   direction: :forward,
   log: %{level: :info, sql: false},
   migrator_direction: :up,
   repo: App.Repo,
   subcommands: []
 }}

Don't fully understand everything that is going on with this function yet but moving on from this for right now as it does not look like it does anything with the actually addition of table columns etc.

Continuing with looking through expand_create we next get this...

      if table.primary_key do
        opts = Runner.repo_config(:migration_primary_key, [])
        opts = Keyword.put(opts, :primary_key, true)

        {name, opts} = Keyword.pop(opts, :name, :id)
        {type, opts} = Keyword.pop(opts, :type, :bigserial)

        add(name, type, opts)
      end

If the user has not defined a primary key for their table then id is created as the primary key. Here is the log of what is passed the add/3 (the same add 3 we call in the migration file)...

[name: :id, type: :bigserial, opts: [primary_key: true]]

So this ends up calling add/3 pretty much how we have which in the migration. The next line is

unquote(block)

remember that block is {:add, [line: 6], [:a, :string]}. The unquote like appears to be the line that is calling the add/3 function for us.

Taking a look at the add/3 function now.

  def add(column, type, opts \\ []) when is_atom(column) and is_list(opts) do
    IO.inspect(binding(), label: "binding log in the add function")
    validate_precision_opts!(opts, column)
    validate_type!(type)
    Runner.subcommand {:add, column, type, opts}
  end

Logging the arguments passed into add logs the following...

binding log in the add function: [column: :id, opts: [primary_key: true], type: :bigserial]
binding log in the add function: [column: :a, opts: [], type: :string]

We can see the first add call is actually for the id column and has the same arguments that we logged above. The second call to the add function is for the field that we added in the migration file.

Add also calls a function from the Ecto.Migration.Runner module so let's look at that next...

  @doc """
  Adds a subcommand to the current command. Must call `start_command/1` first.
  """
  def subcommand(subcommand) do
    reply =
      Agent.get_and_update(runner(), fn
        %{command: nil} = state ->
          {:error, state}
        state ->
          {:ok, update_in(state.subcommands, &[subcommand|&1])}
      end)

    case reply do
      :ok ->
        :ok
      :error ->
        raise Ecto.MigrationError, message: "cannot execute command outside of block"
    end
  end

(slightly more helpful doc but still not amazing πŸ˜•)

RobStallion commented 5 years ago

I have added a few more logs and think I have a better understanding of what is going on now.

expand_create calls Runner.start_command, which in gets and updates an initial state (using the Agent module) with the command being run on the db (e.g. create)

# initial state in start_command
%{
  command: nil,
  commands: [],
  config: [
    telemetry_prefix: [:app, :repo],
    otp_app: :app,
    timeout: 15000,
    username: "postgres",
    password: "postgres",
    database: "app_dev",
    hostname: "localhost",
    pool_size: 10
  ],
  direction: :forward,
  log: %{level: :info, sql: false},
  migrator_direction: :up,
  repo: App.Repo,
  subcommands: []
}

# updated state in start_command
# updates command field
%{
 command: {:create,
  %Ecto.Migration.Table{
    comment: nil,
    engine: nil,
    name: "tests",
    options: nil,
    prefix: nil,
    primary_key: true
  }},
 commands: [],
 config: [
   telemetry_prefix: [:app, :repo],
   otp_app: :app,
   timeout: 15000,
   username: "postgres",
   password: "postgres",
   database: "app_dev",
   hostname: "localhost",
   pool_size: 10
 ],
 direction: :forward,
 log: %{level: :info, sql: false},
 migrator_direction: :up,
 repo: App.Repo,
 subcommands: []
}

expand_create then calls the add function twice as we mentioned above. Add calls Runner.subcommand. subcommand then updates the same state start_command created...

# state after first subcommand call. Adds id to subcommands
%{
  command: {:create,
    %Ecto.Migration.Table{
      comment: nil,
      engine: nil,
      name: "tests",
      options: nil,
      prefix: nil,
      primary_key: true
    }},
  commands: [],
  config: [
    telemetry_prefix: [:app, :repo],
    otp_app: :app,
    timeout: 15000,
    username: "postgres",
    password: "postgres",
    database: "app_dev",
    hostname: "localhost",
    pool_size: 10
  ],
  direction: :forward,
  log: %{level: :info, sql: false},
  migrator_direction: :up,
  repo: App.Repo,
  subcommands: [{:add, :id, :bigserial, [primary_key: true]}]
}

# state after second subcommand call. Adds field :a to subcommands
%{
  command: {:create,
    %Ecto.Migration.Table{
      comment: nil,
      engine: nil,
      name: "tests",
      options: nil,
      prefix: nil,
      primary_key: true
    }},
  commands: [],
  config: [
     telemetry_prefix: [:app, :repo],
     otp_app: :app,
     timeout: 15000,
     username: "postgres",
     password: "postgres",
     database: "app_dev",
     hostname: "localhost",
     pool_size: 10
  ],
  direction: :forward,
  log: %{level: :info, sql: false},
  migrator_direction: :up,
  repo: App.Repo,
  subcommands: [
    {:add, :a, :string, []},
    {:add, :id, :bigserial, [primary_key: true]}
  ]
}

Next expand_create calls Runner.end_command...

  def end_command do
    Agent.update runner(), fn state ->
      {operation, object} = state.command
      command = {operation, object, Enum.reverse(state.subcommands)}
      %{state | command: nil, subcommands: [], commands: [command|state.commands]}
    end
  end

This function gets the command and subcommands fields and adds them to the commands list field...

%{
  command: nil,
  commands: [
    {:create,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests",
       options: nil,
       prefix: nil,
       primary_key: true
     }, [{:add, :id, :bigserial, [primary_key: true]}, {:add, :a, :string, []}]}
  ],
  config: [
    telemetry_prefix: [:app, :repo],
    otp_app: :app,
    timeout: 15000,
    username: "postgres",
    password: "postgres",
    database: "app_dev",
    hostname: "localhost",
    pool_size: 10
  ],
  direction: :forward,
  log: %{level: :info, sql: false},
  migrator_direction: :up,
  repo: App.Repo,
  subcommands: []
}

The commands field has all the info that we need for our table to be created.

If we want to create a way for our users to easily create a history version of the current table that they are creating (or apply changes being done on the original table to the history table like removing a column) , being able to access the state that is being updated in the Agent and adding to the commands list is key.

The next step is figure out how we can access this state. The Runner module appears to access this state by calling the private function runner...

  defp runner do
    case Process.get(:ecto_migration) do
      %{runner: runner} -> runner
      _ -> raise "could not find migration runner process for #{inspect self()}"
    end
  end

If we can access the state in similar way we should be able to copy and update a command to also run on a history version of the table.

e.g...

[
    {:create,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests",
       options: nil,
       prefix: nil,
       primary_key: true
     }, [{:add, :id, :bigserial, [primary_key: true]}, {:add, :a, :string, []}]}
  ]

would become something like....

[
    {:create,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests",
       options: nil,
       prefix: nil,
       primary_key: true
     }, [{:add, :id, :bigserial, [primary_key: true]}, {:add, :a, :string, []}]}
  ]
,
[
    {:create,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests_history",
       options: nil,
       prefix: nil,
       primary_key: true
     }, [{:add, :id, :bigserial, [primary_key: true]}, {:add, :a, :string, []}, {:add, :ref_id, :bigserial, []}]}
#  {:add, :ref_id, :bigserial, []} I'm sure that this would be a little more complex than this in reality but this is just to illustrate a point. 
  ]

This should be completely doable. We should just need to follow the functions in the runner module.

RobStallion commented 5 years ago

Created a couple functions based on the ones found in Runner

  # Adds the test_history tuple to the list of commands in the state
  defp add_history_of_tests do
    Agent.update runner(), fn state ->
      %{state | commands: [tests_history()|state.commands]}
    end
  end

  # literally copied the function from Runner in this case
  defp runner do
    case Process.get(:ecto_migration) do
      %{runner: runner} -> runner
      _ -> raise "could not find migration runner process for #{inspect self()}"
    end
  end

  # returns a tuple to create a tests_history table
  defp tests_history do
    {:create,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests_history",
       options: nil,
       prefix: nil,
       primary_key: true
     },
     [
       {:add, :id, :bigserial, [primary_key: true]},
       {:add, :ref_id,
        %Ecto.Migration.Reference{
          column: :id,
          name: nil,
          on_delete: :nothing,
          on_update: :nothing,
          prefix: nil,
          table: "tests",
          type: :id
        }, []},
       {:add, :a, :string, []}
     ]}
  end

Then called the add_history_of_tests in the create function like so...

  def change do
    create table(:tests) do
      add :a, :string
    end
    add_history_of_tests()
  end

This works perfectly...

➜  app git:(auto-create-history-table) βœ— mix ecto.migrate
Compiling 19 files (.ex)
Generated app app
[info] == Running 20190502162628 App.Repo.Migrations.CreateTestTables.change/0 forward
[info] create table tests
[info] create table tests_history
[info] == Migrated 20190502162628 in 0.0s
➜  app git:(auto-create-history-table) βœ— mix ecto.rollback
[info] == Running 20190502162628 App.Repo.Migrations.CreateTestTables.change/0 backward
[info] drop table tests_history
[info] drop table tests
[info] == Migrated 20190502162628 in 0.0s

Obviously the functions I created are very simplified versions of what would be needed but this proves that we would be able to create a function that a user could call in their migration to create a history table πŸŽ‰

RobStallion commented 5 years ago
defmodule App.Repo.Migrations.CreateTestTables do
  use Ecto.Migration

  def change do
    create table(:tests) do
      add :a, :string
    end

    history()
  end

  def history do
    Agent.update runner(), fn state ->
      %{state | commands: create_update_history(state.commands)}
    end
  end

  defp create_update_history(commands) do
    Enum.reduce(commands, [], fn
      # if the create command was called then it we create a history version of
      # the table that create was being called on.
      {:create, table, subcommands} = command, acc ->
        history_table = Map.update!(table, :name, &(&1 <> "_history"))

        add_ref_id = [{:add, :ref_id,
         %Ecto.Migration.Reference{
           column: :id,
           name: nil,
           on_delete: :nothing,
           on_update: :nothing,
           prefix: nil,
           table: table.name,
           type: :id
         }, []}]

        subcommands = subcommands ++ add_ref_id
        history = {:create, history_table, subcommands}

        acc = [command | acc]
        [history | acc]

      # currently if the command is anything other than create we return the
      # list of commands
      # This function can be extended to handle other cases, like tables being
      # dropped etc.
      {_command, _table, _subcommands} = t, acc ->
        [t | acc]
    end)
  end

  defp runner do
    case Process.get(:ecto_migration) do
      %{runner: runner} -> runner
      _ -> raise "could not find migration runner process for #{inspect self()}"
    end
  end
end

@nelsonic Is the above the sort of solution you had in mind? Of course we would separate the function logic so all a user would have to do is type history() at the end of their change/up/down function call.

nelsonic commented 5 years ago

@RobStallion this is a really clear implementation. πŸ‘

RobStallion commented 5 years ago

Going to continue with the acceptance criteria's "Boss Level πŸ‘Ύ" section and try to extend the current history/0 function so that it also works for altering an existing table.

RobStallion commented 5 years ago

I have created a new migration file which alters the tests table to add a new column to it...

defmodule App.Repo.Migrations.AlterTestTables do
  use Ecto.Migration

  def change do
    alter table(:tests) do
      add :b, :string
    end
  end
end

When I run the migration and log the :commands key from the command map that is created this is what it looks like...

commands: [
    {:alter,
     %Ecto.Migration.Table{
       comment: nil,
       engine: nil,
       name: "tests",
       options: nil,
       prefix: nil,
       primary_key: true
     }, [{:add, :b, :string, []}]}
  ]

We can see that this is very similar to what we logged above when we created a table

RobStallion commented 5 years ago

Creating the

  def history do
    Agent.update runner(), fn state ->
      %{state | commands: create_update_history(state.commands)}
    end
    create_pg_notify_function(%{name: "tests"})
    create_drop_trigger(%{name: "tests"})
  end

Updated the history function so that it also creates a postgres trigger. This worked when we were only creating a table but it is throwing an error when we try to alter a table as the table already has the trigger on it. Looking into ways that we can add the trigger to the table on creation only.

RobStallion commented 5 years ago

@nelsonic I think we have been able to make good progress on this spike.

For features that focus on the migration, we can now automatically create and alter history tables which will mirror the original table being created/altered. We also create the postgres trigger automatically in this step. All the user would need to do is require this as a module into their application and then call the history/0 function in their migration...

  def change do
    alter table(:tests) do
      add :b, :string
    end

    history() # <----- will also add the :b column to the tests_history table
  end

For the features that focus on the schema we have been able to create functions that automatically insert records into the history table when the original is inserted into or updated. However this currently requires a fair amount of code to exist in the users application, the listener module, the listen function in the repo module and the history schema module.

I believe that it would be possible to abstract a lot of this code away so that it would be a similar experience to how it is with the migrations but I have not looked into this yet.

What I think will be the biggest hurdle here is that currently we require a history version of the schema, e.g. address_history.ex. If we don't want to have users create a history version of the schema every time they use the module we need to automate its creation or change the way that we insert into the table.

There is a LOT more that could be added to improve/extend this functionality (ensure that the history function would work with up and down (currently only works with change), ensure that the history function doesn't error when id is not the primary key in the original table (or if it doesn't exist in the original), what I mentioned above with the user needing to create a history schema and many many more I'm sure) but I think that we have accomplished what we set out to in the spike.

nelsonic commented 5 years ago

@RobStallion agree that you've made great progress on this! πŸŽ‰ We need to dive into using it in an example to teas out more requirements.

RobStallion commented 5 years ago

@nelsonic When you say we should create an example, do you mean that we should create an example showing people...

  1. how to create their own version of this*
  2. how to use a module we create in their own application?**

* This is pretty much done in the readme of this repo. The readme doesn't use the exact same functionality that we are using but it does show how to create a version of this (we are using postgres triggers for instance but the example just chains inserts together)

** This will require use to build a module first. I think that we have most of the code needed to build the module but we will for sure need to work on a way to streamline the schema code for users as mentioned in this comment

nelsonic commented 5 years ago

@RobStallion ultimately, the more examples we create the easier it is to maintain our projects. If we can show people how to do record history in Phoenix from "first principals". Basically, we need to either update https://github.com/dwyl/phoenix-ecto-append-only-log-example or write a new example from scratch. I think the existing append-only-log-example is fine how it is several people in the Elixir community have found it useful ... https://github.com/dwyl/phoenix-ecto-append-only-log-example/stargazers

Please complete the PRs for this spike and assign them to me and I will dive into the "phoenix-ecto-history-example" when the time comes.

RobStallion commented 5 years ago
select column_name, data_type, character_maximum_length
from INFORMATION_SCHEMA.COLUMNS where table_name = 'addresses' 
AND table_catalog = 'app_dev'
RobStallion commented 5 years ago

https://stablekernel.com/storing-historical-data-with-postgresql-and-automatic-partitions/

RobStallion commented 5 years ago

https://stackoverflow.com/questions/38954139/implementing-history-of-postgresql-table

nelsonic commented 5 years ago

Rob's question: https://stackoverflow.com/questions/56295703/how-to-store-table-history-in-postgresql