dwyl / alog

🌲 alog (Append-only Log) is an easy way to start using the Lambda/Kappa architecture in your Elixir/Phoenix Apps while still using PostgreSQL (with Ecto).
GNU General Public License v2.0
15 stars 2 forks source link

Insert - Ecto.Adapters.SQL.Connection #45

Open Danwhy opened 5 years ago

Danwhy commented 5 years ago

Part of #38

https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Connection.html#c:insert/6

For this function, we need to generate a cid based on the given data, as well as create an "entry id" for ease of lookup.

RobStallion commented 5 years ago

Comment schema in app

  schema "comments" do
    field :comment, :string
    field :comment_id_no, :string
    field :show, :boolean

    timestamps()
  end

Calling Repo.insert in app (purposefully leaving nil empty)

Repo.insert(%UsingAlogAdapter.Comments{comment: "hi", comment_id_no: "1"})

Newly defined insert/6 in AlogAdapter module

  def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
    params = params ++ [show: true]  # <---- Adding :show as :true in the adapter

    {kind, conflict_params, _} = on_conflict
    {fields, values} = :lists.unzip(params)
    sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)
    Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
  end

Logs from the terminal

INSERT INTO "comments" ("comment","comment_id_no","inserted_at","updated_at","show") VALUES ($1,$2,$3,$4,$5) RETURNING "id" ["hi", "1", ~N[2019-02-19 20:01:40], ~N[2019-02-19 20:01:40], true]

Changeset returned from Repo.insert

{:ok,
 %UsingAlogAdapter.Comments{
   __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
   comment: "hi",
   comment_id_no: "1",
   id: 51, # <----- ID no. entered into the db
   inserted_at: ~N[2019-02-19 20:01:40],
   show: nil,
   updated_at: ~N[2019-02-19 20:01:40]
 }}

# notice that the changeset says show is nil. I think this is actually the behaviour we
# will want in the adapter. If we are going to use the adapter to manually add the field
# entry_id (what I am doing with show here), then it will not be part of the users schema,
# meaning that they do not need to see it in their changeset

Log of Repo.get(Comments, 51)

iex()> Repo.get(Comments, 51)
[debug] QUERY OK source="comments" db=3.8ms queue=1.4ms
SELECT DISTINCT ON (c0."comment_id_no") c0."id", c0."comment", c0."comment_id_no", c0."show", c0."inserted_at", c0."updated_at" FROM "comments" AS c0 WHERE (c0."id" = $1) [51]
%UsingAlogAdapter.Comments{
  __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
  comment: "hi",
  comment_id_no: "1",
  id: 51,
  inserted_at: ~N[2019-02-19 20:01:40],
  show: true,  # <--------- Now showing true
  updated_at: ~N[2019-02-19 20:01:40]
}

We can see that when we select the new entry from the db is has show as true.

The shows that we can add/manipulate the values before they get entered into the db using an adapter. 🎉😄

RobStallion commented 5 years ago

For more info on how I got to this point see https://github.com/RobStallion/alog_adapter/issues/2

RobStallion commented 5 years ago

Next steps

Make sure the entry_id is unique

I think that this will be the most tricky part as it means that we will somehow need to query the database. I think that this will be tricky as we will need to be able to create a query struct that will allow us to query the db from the adapter (something that we can normally do in an app with Repo.all(User) for example will not work as it creates the struct incorrect. I think it will be a similar issue as the one we encountered with all and subquery)

Danwhy commented 5 years ago

We should be able to make sure the CID is unique using a unique index. In the execute_ddl functions when we create a table we define the cid column, we can also add a unique index for it.

All we need to do then is make sure the entry id is unique, which might require some querying

Danwhy commented 5 years ago

@RobStallion Another thing to note is that we need to make sure the timestamp fields are present in the params when we insert.

I automatically create the columns during the migration, but they will just be null if timestamps is not in the schema. We just need to check the params, and insert them if the user hasn't.

RobStallion commented 5 years ago

@Danwhy Thanks for responding on this. I realised that I made a typo sorry 😞

I was meant to say "Make sure the entry_id" is unique". I will update my earlier comment. You mentioned what I thought was the issue (regardless of me being an egg 🥚)

All we need to do then is make sure the entry id is unique, which might require some querying

@Danwhy @SimonLab Do you have any thoughts on how we can do the querying to check if the entry_id is unique?

RobStallion commented 5 years ago

@Danwhy Regarding this comment, good thinking 🧠

I'll make sure I add a check for that. Should be a fairly simple step 👍

Danwhy commented 5 years ago

The query should be quite simple, so we might be able to just create a sql string: SELECT entry_id where entry_id = 'f' for example, then pass it to the execute function.

We can do this recursively until we find a non existing entry id: SELECT entry_id where entry_id = 'fy', SELECT entry_id where entry_id = 'fyZ' etc.

RobStallion commented 5 years ago

@Danwhy Can we write a query like that in the adapter?

I thought that it would give us the error we were getting when trying to use subquery. I will test this now and get back to you

Danwhy commented 5 years ago

The error we were getting in all is because we were trying to manipulate an existing ecto query using lots of existing functions.

For this we just need a simple sql command with only one parameter, so there's no reason to use an ecto query.

RobStallion commented 5 years ago

Did the same Repo.insert call I have for the above tests.

Came through to the insert function I added to the adapter.

  import Ecto.Query, only: [from: 1]
  def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
    params = params ++ [show: true]

    IO.inspect(source, label: "source ===>") # logs: source ===>: "comments"

    AlogAdapter.Connection.all(from c in source, select: c.id) # Causing the error below
    |> IO.inspect(label: "-----> Result of repo all")

    {kind, conflict_params, _} = on_conflict
    {fields, values} = :lists.unzip(params)
    sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)
    Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
  end

Error message

** (exit) an exception was raised:
    ** (ArgumentError) argument error
        :erlang.tuple_size(nil)
        (ecto_sql) lib/ecto/adapters/postgres/connection.ex:655: Ecto.Adapters.Postgres.Connection.create_names/1
        (ecto_sql) lib/ecto/adapters/postgres/connection.ex:107: Ecto.Adapters.Postgres.Connection.all/1
        (alog_adapter) lib/alog_adapter.ex:55: AlogAdapter.insert/6
        (ecto) lib/ecto/repo/schema.ex:651: Ecto.Repo.Schema.apply/4
        (ecto) lib/ecto/repo/schema.ex:264: anonymous fn/15 in Ecto.Repo.Schema.do_insert/3
        (using_alog_adapter) lib/using_alog_adapter_web/controllers/page_controller.ex:15: UsingAlogAdapterWeb.PageController.index/2
        (using_alog_adapter) lib/using_alog_adapter_web/controllers/page_controller.ex:1: UsingAlogAdapterWeb.PageController.action/2
        (using_alog_adapter) lib/using_alog_adapter_web/controllers/page_controller.ex:1: UsingAlogAdapterWeb.PageController.phoenix_controller_pipeline/2
        (using_alog_adapter) lib/using_alog_adapter_web/endpoint.ex:1: UsingAlogAdapterWeb.Endpoint.instrument/4
        (phoenix) lib/phoenix/router.ex:275: Phoenix.Router.__call__/1
        (using_alog_adapter) lib/using_alog_adapter_web/endpoint.ex:1: UsingAlogAdapterWeb.Endpoint.plug_builder_call/2
        (using_alog_adapter) lib/plug/debugger.ex:122: UsingAlogAdapterWeb.Endpoint."call (overridable 3)"/2
        (using_alog_adapter) lib/using_alog_adapter_web/endpoint.ex:1: UsingAlogAdapterWeb.Endpoint.call/2
        (phoenix) lib/phoenix/endpoint/cowboy2_handler.ex:34: Phoenix.Endpoint.Cowboy2Handler.init/2
        (cowboy) /Users/robertfrancis/Code/spike/using_alog_adapter/deps/cowboy/src/cowboy_handler.erl:41: :cowboy_handler.execute/2
        (cowboy) /Users/robertfrancis/Code/spike/using_alog_adapter/deps/cowboy/src/cowboy_stream_h.erl:296: :cowboy_stream_h.execute/3
        (cowboy) /Users/robertfrancis/Code/spike/using_alog_adapter/deps/cowboy/src/cowboy_stream_h.erl:274: :cowboy_stream_h.request_process/3
        (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

I figured that this would be the most simple query I could write to test the theory.

code in app

    Repo.all(from c in "comments", select: c.id)

This query in my application works fine if the adapter is left like so...

  def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
    params = params ++ [show: true]
    {kind, conflict_params, _} = on_conflict
    {fields, values} = :lists.unzip(params)
    sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)
    Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
  end

If anyone sees anything obviously wrong with this then please comment.

Danwhy commented 5 years ago

AlogAdapter.Connection.all will just return a sql string that we'll then have to execute. There's no need for us to call it as the string is so simple. Try just executing a sql string:

Ecto.Adapters.SQL.query(adapter_meta, "SELECT id FROM #{source}", [])
RobStallion commented 5 years ago

@Danwhy Worked like a charm thanks 👍

RobStallion commented 5 years ago
  def insert(adapter_meta, %{source: source, prefix: prefix}, params, on_conflict, returning, opts) do
    params_map = Enum.into(params, %{})
    map_for_cid = Map.drop(params_map, [:inserted_at, :updated_at])

    cid = Cid.cid(map_for_cid)
    entry_id = create_entry_id(source, adapter_meta, cid, 2)

    params =
      params_map
      |> Map.put(:cid, cid)
      |> Map.put(:entry_id, entry_id)
      |> Enum.into([])

    {kind, conflict_params, _} = on_conflict
    {fields, values} = :lists.unzip(params)
    sql = @conn.insert(prefix, source, fields, [fields], on_conflict, returning)
    Ecto.Adapters.SQL.struct(adapter_meta, @conn, sql, :insert, source, [], values ++ conflict_params, kind, returning, opts)
  end

  defp create_entry_id(source, adapter_meta, cid, n) do
    entry_id = String.slice(cid, 0..n)
    entry_id_query = "SELECT * FROM #{source} where entry_id='#{entry_id}'"
    {:ok, results} = Ecto.Adapters.SQL.query(adapter_meta, entry_id_query, [])

    if results.num_rows == 0 do
      entry_id
    else
      create_entry_id(source, adapter_meta, cid, n+1)
    end
  end

  defp add_timestamps(params) do
    params
    |> Enum.into(%{})
    |> Map.put_new(:inserted_at, NaiveDateTime.utc_now())
    |> Map.put_new(:updated_at, NaiveDateTime.utc_now())
  end
RobStallion commented 5 years ago

The above adds the cid and entry_id to the database. Makes sure that the entry_id is unique.

Only thing I am slightly unsure of is this line... |> Map.put(:cid, cid)

as in reality this key will be the id (not cid).

Danwhy commented 5 years ago

@RobStallion When I try to insert something into the database I'm getting the following error:

** (DBConnection.EncodeError) Postgrex expected a binary of 16 bytes, got "zb2rhmxXxMwxhqGdQvqQw1dok73xqkdyLb1gJHx1uUWBzxvZ1". Please make sure the value you are passing matches the definition in your table or in your query or convert the value accordingly.

Did you come across this at all? I've set the primary key of my schema to be the :cid, with a type of :binary_id and autogenerate: true.

Danwhy commented 5 years ago

⬆️I had the column type in the migration also set to :binary_id, which Postgrex translates to a Postgres type of uuid, which is a specifically 16 bit binary.

I had to change the type of the column :binary, which Postgrex turns into a Postgres type of bytea, which is its generic binary type.

RobStallion commented 5 years ago

@Danwhy I have created a PR with code for the insert functionality.

@Danwhy @SimonLab @nelsonic seems to be working as expected but please take a look at the pr and let me know your thoughts.

This PR has made me think of a couple of questions

  1. When we insert, do we want to allow duplicate data to be inserted? If so how to we want to handle the entry_id value?

    I thought we may want to allow duplicate data for cases where a users creates data, then edits that data, then re-edits the data again but back to the original state. e.g. username=robstallion username=robdabank username=robstallion (this is the duplicate)

as the CID will be the same if duplicate data is inserted the entry ID will just continue to get longer and longer until it is an exact match to the CID, at which point no more duplicates will be allowed to be inserted (as it currently exists)

  1. What do we want to do with regards to the changeset that is given back to the user? See example below for some context. See above comment for more context
    
    {:ok,
    %UsingAlogAdapter.Comments{
    __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
    comment: "hi",
    comment_id_no: "1",
    id: 51, # <----- ID no. entered into the db
    inserted_at: ~N[2019-02-19 20:01:40],
    show: nil,
    updated_at: ~N[2019-02-19 20:01:40]
    }}

notice that the changeset says show is nil. I think this is actually the behaviour we

will want in the adapter. If we are going to use the adapter to manually add the field

entry_id (what I am doing with show here), then it will not be part of the users schema,

meaning that they do not need to see it in their changeset

RobStallion commented 5 years ago

Getting an error when I try to run mix ecto.migrate

➜  alog git:(insert) ✗ MIX_ENV=test mix ecto.create
Compiling 11 files (.ex)
Generated alog app
The database for Alog.Repo has been created
➜  alog git:(insert) ✗ MIX_ENV=test mix ecto.migrate
10:20:57.375 [error] Could not update schema migrations. This error usually happens due to the following:

  * The database does not exist
  * The "schema_migrations" table, which Ecto uses for managing
    migrations, was defined by another library

To fix the first issue, run "mix ecto.create".

To address the second, you can run "mix ecto.drop" followed by
"mix ecto.create". Alternatively you may configure Ecto to use
another table for managing migrations:

    config :alog, Alog.Repo,
      migration_source: "some_other_table_for_schema_migrations"

The full error report is shown below.

10:20:57.378 [error] Postgrex.Protocol (#PID<0.203.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.219.0> exited
RobStallion commented 5 years ago

Something in the insert/6 function that is created on this branch is causing the error.

Not sure what it is at the moment but if I comment out the function and then run the migrate command, it works as expected.

RobStallion commented 5 years ago

Didn't notice this error message first time round...

** (MatchError) no match of right hand side value: {:error, %Postgrex.Error{connection_id: 39622, message: nil, postgres: %{code: :undefined_column, file: "parse_relation.c", line: "3293", message: "column \"entry_id\" does not exist", pg_code: "42703", position: "39", routine: "errorMissingColumn", severity: "ERROR", unknown: "ERROR"}, query: "SELECT * FROM schema_migrations where entry_id='zb2'"}}
    lib/alog.ex:64: Alog.create_entry_id/4
    lib/alog.ex:39: Alog.insert/6
    (ecto) lib/ecto/repo/schema.ex:651: Ecto.Repo.Schema.apply/4
    (ecto) lib/ecto/repo/schema.ex:264: anonymous fn/15 in Ecto.Repo.Schema.do_insert/3
    (ecto) lib/ecto/repo/schema.ex:166: Ecto.Repo.Schema.insert!/3
    (ecto_sql) lib/ecto/migrator.ex:493: Ecto.Migrator.verbose_schema_migration/3
    (ecto_sql) lib/ecto/migrator.ex:164: Ecto.Migrator.async_migrate_maybe_in_transaction/6
    (ecto_sql) lib/ecto/migrator.ex:438: anonymous fn/5 in Ecto.Migrator.do_migrate/4

This should shed some more light on what is going wrong with the insert function

RobStallion commented 5 years ago

This error appears to be caused by the schema migrations table. This appears to relate to this comment from @Danwhy.

Going to move on for now

Danwhy commented 5 years ago

@RobStallion you'll need to add a clause for your insert function that forwards to the original insert if the source is "schema_migrations"

RobStallion commented 5 years ago

Beginning to add tests to this branch.

Migration file

  def change do
    create table(:comments, primary_key: false) do
      # cid & entry_id need to be removed later as they should be handled in execute_ddl I believe
      # timestamps are needed in alog but may or may not be in the schema.
      add(:cid, :string, primary_key: true)
      add(:entry_id, :string)
      add(:comment, :string)
      add(:deleted, :boolean, default: false)

      timestamps()
    end

Schema

  # I'd imagine we'll change string as the type
  @primary_key {:cid, :string, autogenerate: false}
  schema "comments" do
    field(:entry_id, :string)
    field(:comment, :string)
    field(:deleted, :boolean, default: false)
  end

I can call Repo.insert with a struct (normal ecto behaviour). See logs below...

Test

Repo.insert(%Comment{comment: "hi"})
|> IO.inspect(label: "===> Result of insert")

===> Result of insert: {:ok,
 %Alog.TestApp.Comment{
   __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
   cid: nil,
   comment: "hi",
   deleted: false,
   entry_id: nil
 }}

Repo.all(Comment)
|> IO.inspect(label: "===> Result of Repo.all")

===> Result of Repo.all: [
  %Alog.TestApp.Comment{
    __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
    cid: "zb2rhh7yN9jPXNoU95A2JJhufSHSFN7X2q45Jmu72CXjF2QJ9",
    comment: "hi",
    deleted: false,
    entry_id: "zb2"
  }
]

You can see that the record was inserted into the db. However, the struct returned from insert and all do not match.

RobStallion commented 5 years ago

If we are going to add the fields cid, entry_id and deleted ourselves when the migrations are run then these fields with not be in the users schema. This means that the result of insert will be (using the above as an example)...

{:ok,
 %Alog.TestApp.Comment{
   __meta__: #Ecto.Schema.Metadata<:loaded, "comments">,
   comment: "hi"
 }}

Questions

@Danwhy @nelsonic @SimonLab

Is this how you have envisioned the adapter working? What fields would you expect a user to put in their comment schema?

nelsonic commented 5 years ago

@RobStallion seems good.

Related but not urgent: I feel we may need to standardise on :id as the identifier rather than :entry_id simply on the basis that more people will expect the field to be called :id ... thoughts?

RobStallion commented 5 years ago

@nelsonic Is this a quote from another issue or is this your response to above? (the 'insert quote' > made me unsure)

I'm happy to rename entry_id to id. I think it makes sense. Although an entry_id may appear in a table multiple times, it still acts in the same way as a regular id would.

nelsonic commented 5 years ago

@RobStallion apologies if that indentation was confusing. 😕 I think your progress on insert is looking good. 👍 And separately I think we should consider having the id of the record named :id instead of :entry_id because I don't know if it's worth having the :entry_ part, unless you feel that that it helps people to understand that it's different from a standard phoenix schema :id ...? 🤔

RobStallion commented 5 years ago

In my current comment table, I have made cid the primary key. If I try to insert two duplicate comments I get the following error...

    test "inserting the same comment twice fails" do
      Repo.insert(%Comment{comment: "hi"})
      Repo.insert(%Comment{comment: "hi"})
    end
  1) test Repo.insert/2 - with Comment struct: inserting the same comment twice fails (AlogTest.InsertTest)
     test/insert_test.exs:13
     ** (Ecto.ConstraintError) constraint error when attempting to insert struct:

         * comments_pkey (unique_constraint)

     If you would like to stop this constraint violation from raising an
     exception and instead add it as an error to your changeset, please
     call `unique_constraint/3` on your changeset with the constraint
     `:name` as an option.

     The changeset has not defined any constraint.

     code: Repo.insert(%Comment{comment: "hi"})
     stacktrace:
       (ecto) lib/ecto/repo/schema.ex:689: anonymous fn/4 in Ecto.Repo.Schema.constraints_to_errors/3
       (elixir) lib/enum.ex:1314: Enum."-map/2-lists^map/1-0-"/2
       (ecto) lib/ecto/repo/schema.ex:674: Ecto.Repo.Schema.constraints_to_errors/3
       (ecto) lib/ecto/repo/schema.ex:276: anonymous fn/15 in Ecto.Repo.Schema.do_insert/3
       test/insert_test.exs:15: (test)
nelsonic commented 5 years ago

@RobStallion if the comment is identical and therefore the CI derived from the data is the same, then it should fail. We need to figure out how to return that error message to the viewer.

RobStallion commented 5 years ago

I have created the following two tests...

    test "inserting the same comment twice fails with changeset" do
      Repo.insert(%Comment{comment: "hi"})

      {atom, _changeset} =
        %Comment{}
        |> Comment.changeset(%{comment: "hi"})
        |> Repo.insert()

      assert atom == :error
    end

    test "inserting the same comment twice fails without changeset" do
      Repo.insert(%Comment{comment: "hi"})
      Repo.insert(%Comment{comment: "hi"})
    end

The first returns an error to the user, but the second one is causing the test to error and stop running. This is not behaviour that we want to happen in applications using the adapter. However it doesn't look like the adapter is the step returning the error to the user so I am not sure how we would resolve this issue.

Could this be an issue with the migration & schema set up I have used for testing?

RobStallion commented 5 years ago

Just caught up with @SimonLab @Danwhy on zoom about the results of testing, see comment on this pr.

The migration file created for the tests will be changed once the execute_ddl functionality is merged in as it handle creating the cid, entry_id and deleted columns (we can also manage adding timestamps() I assume).

e.g.

    create table(:comments, primary_key: false) do
      add(:cid, :string, primary_key: true)
      add(:entry_id, :string)
      add(:deleted, :boolean, default: false)
      add(:comment, :string)

      timestamps()
    end

will become

    create table(:comments, primary_key: false) do
      add(:comment, :string)
    end

We have realised however that users will needs to manually add these fields to the schema file still as we are not able to control this file through the adapter.

The schema for the above migration will need to look something like...

  @primary_key {:cid, :string, autogenerate: false}
  schema "comments" do
    field(:entry_id, :string)
    field(:deleted, :boolean, default: false)
    field(:comment, :string)
  end

  def changeset(comment_struct, attrs \\ %{}) do
    comment_struct
    |> cast(attrs, [:cid, :entry_id, :comment, :deleted])
    |> unique_constraint(:cid, name: :comments_pkey)
  end

This is because the schema file is 'handled' (couldn't think of a better word) by the Ecto.Schema module.

We can create macros in our alog module in the future that could override this functionality. This way users would only need to put use Alog.Schema at the top of their file and then they could have a file closer to...

  use Alog.Schema

  schema "comments" do
    field(:comment, :string)
  end

  def changeset(comment_struct, attrs \\ %{}) do
    comment_struct
    |> cast(attrs, [:comment])
  end

Although this would be AMAZING, I do not believe that it is a blocker. I think that we should focus on getting the adapter finished and integrated with auth. Once this is working we can then come back and add the macros needed to have a more "clean" schema file.

For now, when we use the insert function in our adapter we will need call the relevant changeset function beforehand, and we will need to ensure that the changeset function has unique_constraint(:cid, name: :comments_pkey) added.

RobStallion commented 5 years ago

I think that the only thing we need to think about and discuss further is how we want to handle inserting duplicate data.

I will open a separate issue and link back to this.