cpursley / walex

Postgres change events (CDC) in Elixir
MIT License
276 stars 14 forks source link

Event Struct: Combine records and expose Postgres type #24

Closed cpursley closed 8 months ago

cpursley commented 8 months ago

I've been kicking around the idea of sending through a combined old_record and record, showing changes and type. But that would be majorly breaking. Something like this:

%WalEx.Event{
    table: :contact,
    type: :update,
    record: %{
      id: %{
        type: "uuid",
        value: "12c48306-769e-4f92-8d9c-0beca1453d29",
        old_value: "12c48306-769e-4f92-8d9c-0beca1453d29",
        changed: false
      },
      phone_number: %{
        type: "varchar",
        value: "555-444-3312",
        old_value: "555-444-3311",
        changed: true
      },
      location_geography: %{
        type: "geography",
        value: "0101000020E6100000303DAB16A21855C0585AA1A3B0E34040",
        old_value: "0101000020E6100000303DAB16A21855C0585AA1A3B0E34040",
        changed: false
      },
      appraisals_count: %{
        type: "int4",
        value: 41,
        old_value: 41,
        changed: false
      },
      updated_at: %{
        type: "timestamptz",
        value: #DateTime<2023-12-05 09:30:44.295628-05:00 -05 Etc/UTC-5>,
        old_value: #DateTime<2023-11-30 09:19:03.827733-05:00 -05 Etc/UTC-5>,
        changed: true
      }
      # ... etc
    },
    commit_timestamp: ~U[2023-12-05 14:30:44.310865Z]
}

This would allow (in theory) for the table and types to be reconstituted elsewhere with the correct postgres types.

cpursley commented 8 months ago

For comparison, debezium sends something like this:

{
    "schema": { 
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value", 
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source", 
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "PostgreSQL_server.inventory.customers.Envelope" 
    },
    "payload": { 
        "before": null, 
        "after": { 
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { 
            "version": "2.5.0.Beta1",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 
        "ts_ms": 1559033904863 
    }
}

^ I like the additional source stuff like db, schema etc

cpursley commented 8 months ago

Consider adding more table info (via Hasura Events):

  "id": "a6a17d74-ee99-44e0-8243-1f0d3ab6d9b7", 
  "table": {
    "name": "contact",
    "schema": "public"
  },

Either flat like this:

  %WalEx.Event{
    schema: :public,
    table: :contact,
    type: :update,
    ...

or similar to debezium:

  %WalEx.Event{ 
    type: :update,
    source: %{
      db: "postgres",
      schema: "public",
      table: "contact",
      # etc
    ...
cpursley commented 8 months ago

Considering allowing opt-out of particular data: https://github.com/supabase/realtime/issues/66

%WalEx.Event{
  type: :update,
  source: %{
    db: "todos",
    schema: "public",
    table: "user"
  },
  # optional: off by default but can turn on
  columns: %{
    id: "uuid",
    email: "varchar",
    first_name: "varchar",
    last_name: "varchar",
    phone_number: "varchar",
    created_at: "timestamptz",
    updated_at: "timestamptz"
  },
  # optional: on by default
  # record would be nil in delete scenario
  new_record: %{
    id: 1,
    email: "andrias.bachera@example.com",
    first_name: "Andrias",
    last_name: "Bachera",
    phone_number: "555-444-3311",
    created_at: "#DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>",
    updated_at: "#DateTime<2023-12-08 17:24:11.13435-05:00 -05 Etc/UTC-5>"
  },
  # optional: on by default 
  # record would be nil in insert scenario
  old_record: %{
    id: 1,
    email: "andrias.bachera@example.com",
    first_name: "Andrias",
    last_name: "Bacher",
    phone_number: "555-444-3311",
    created_at: "#DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>",
    updated_at: "#DateTime<2023-12-08 17:23:27.62339-05:00 -05 Etc/UTC-5>"
  },
  # optional: on by default 
  changes: %{
    last_name: %{
      new_value: "Bachera",
      old_value: "Bacher"
    },
    updated_at: %{
      new_value: "#DateTime<2023-12-08 17:24:11.13435-05:00 -05 Etc/UTC-5>",
      old_value: "#DateTime<2023-12-08 17:23:27.62339-05:00 -05 Etc/UTC-5>"
    }
  },
  commit_timestamp: ~U[2023-12-08 22:24:11.147662Z]
}