simonw / sqlite-chronicle

Use triggers to track when rows in a SQLite table were updated or deleted
34 stars 0 forks source link

Problems if 1,000 rows share the same version number #4

Closed simonw closed 12 months ago

simonw commented 1 year ago

The approaches I'm experimenting with in:

Assume that you can grab batches of 1,000 items at a time, ordered by version, and process those in sequence.

This will break in surprising ways if you have over 1,000 items with the same version number, which could happen after the table is first activated (when it has more than 1,000 existing rows) or potentially if you do an update that affects more than 1,000 rows at a time.

simonw commented 1 year ago

Two approaches I can think of for dealing with this:

That second option appeals to me more. I'm not sure how to implement it for the update trigger though - it might be possible with some kind of window function.

simonw commented 1 year ago

After extensive experimentation with my weird coding intern (ChatGPT Code Interpreter / Advanced Data Analysis) I think the solution here is to use a separate _version_chronicle table to track the max version for each table, such that it can be incremented as part of the loop in the update trigger to give each row a unique version number.

Experiments here: https://chat.openai.com/share/1bbe492d-28f9-4b6b-97d7-23375e35773b

My own modified version of the trigger design is:

CREATE TABLE IF NOT EXISTS _version_chronicle (
    table_name TEXT PRIMARY KEY,
    current_version INTEGER
)

CREATE TRIGGER creatures_bu_set_current_version
BEFORE UPDATE ON creatures
BEGIN
    INSERT OR REPLACE INTO _version_chronicle (table_name, current_version) 
    VALUES ('creatures', coalesce ((SELECT MAX(version) FROM creatures), 1));
END;

CREATE TRIGGER creatures_au_update_version_after_each_row
AFTER UPDATE ON creatures
FOR EACH ROW
BEGIN
    -- Set version to current_version + 1
    UPDATE creatures 
    SET version = (SELECT current_version + 1 FROM _version_chronicle WHERE table_name = 'creatures')
    WHERE id = NEW.id;

    -- Increment the current_version in _version_chronicle
    UPDATE _version_chronicle
    SET current_version = current_version + 1
    WHERE table_name = 'creatures';
END;
simonw commented 1 year ago

Then for the initial population of the table with version numbers, I currently do this:

https://github.com/simonw/sqlite-chronicle/blob/4adac9632e51d4600d550b407d5375ea76f3c3e3/sqlite_chronicle.py#L66-L73

But I want to increment the version number for each of those rows too.

simonw commented 1 year ago

Window functions were added to SQLite in 3.25.0 on 2018-09-15

I did a quick spot check of /-/versions on 10 Datasette instances - found using https://www.googleapis.com/customsearch/v1?key=API-KEY&cx=84ec3c54dca9646ff&q=%22powered+by+datasette%22%20-site:github.com%20-site:simonwillison.net%20-site:datasette.io%20-site:pypi.org - and the earliest I found was 3.27.

So relying on window functions is probably OK for this.

TIL about that search API: https://til.simonwillison.net/google/json-api-programmable-search-engine

simonw commented 1 year ago

Here's the window function alternative for that first populating query:

INSERT INTO "_chronicle_creatures" (id, added_ms, updated_ms, version)
SELECT 
    id, 
    CAST((julianday('now') - 2440587.5) * 86400 * 1000 AS INTEGER), 
    CAST((julianday('now') - 2440587.5) * 86400 * 1000 AS INTEGER), 
    ROW_NUMBER() OVER (ORDER BY id)
FROM "creatures";
simonw commented 1 year ago

I'm going to try this in a branch.

simonw commented 12 months ago

Here's an incomplete prototype of an alternative approach:

diff --git a/sqlite_chronicle.py b/sqlite_chronicle.py
index 1825673..56d3de3 100644
--- a/sqlite_chronicle.py
+++ b/sqlite_chronicle.py
@@ -38,24 +38,42 @@ def enable_chronicle(conn: sqlite3.Connection, table_name: str):
     pk_def = ", ".join(
         [f'"{col_name}" {col_type}' for col_name, col_type in primary_key_columns]
     )
+    pks = ", ".join([col[0] for col in primary_key_columns])

     current_time_expr = "CAST((julianday('now') - 2440587.5) * 86400 * 1000 AS INTEGER)"
-    next_version_expr = (
-        f'COALESCE((SELECT MAX(version) FROM "_chronicle_{table_name}"), 0) + 1'
-    )
+    next_version_expr = f'coalesce((SELECT version FROM __chronicle_versions WHERE table_name = "{table_name}"), 0) + 1'
+
+    increment_version_sql = f"""
+    UPDATE __chronicle_versions
+    SET version = version + 1
+    WHERE table_name = '{table_name}';
+    """

     with conn:
+        # Ensure __chronicle_versions table exists
         c.execute(
-            f"""
+            textwrap.dedent(
+                """
+            CREATE TABLE IF NOT EXISTS __chronicle_versions (
+                table_name TEXT PRIMARY KEY,
+                version INTEGER
+            );
+            """
+            )
+        )
+        c.execute(
+            textwrap.dedent(
+                f"""
             CREATE TABLE "_chronicle_{table_name}" (
                 {pk_def},
                 added_ms INTEGER,
                 updated_ms INTEGER,
                 version INTEGER DEFAULT 0,
                 deleted INTEGER DEFAULT 0,
-                PRIMARY KEY ({', '.join([f'"{col[0]}"' for col in primary_key_columns])})
+                PRIMARY KEY ({pks})
             );
             """
+            )
         )

         # Index on version column
@@ -64,13 +82,15 @@ def enable_chronicle(conn: sqlite3.Connection, table_name: str):
         )

         # Populate the _chronicle_ table with existing rows from the original table
-        c.execute(
-            f"""
-            INSERT INTO "_chronicle_{table_name}" ({', '.join([col[0] for col in primary_key_columns])}, added_ms, updated_ms, version)
-            SELECT {', '.join([f'"{col[0]}"' for col in primary_key_columns])}, {current_time_expr}, {current_time_expr}, 1
-            FROM "{table_name}";
-            """
-        )
+        insert_sql = f"""
+        INSERT INTO "_chronicle_{table_name}" ({pks}, added_ms, updated_ms, version)
+        SELECT {pks},
+            {current_time_expr},
+            {current_time_expr},
+            ROW_NUMBER() OVER (ORDER BY {pks})
+        FROM "{table_name}";
+        """
+        c.execute(insert_sql)

         # Create the after insert trigger
         after_insert_sql = textwrap.dedent(
@@ -79,32 +99,55 @@ def enable_chronicle(conn: sqlite3.Connection, table_name: str):
         AFTER INSERT ON "{table_name}"
         FOR EACH ROW
         BEGIN
-            INSERT INTO "_chronicle_{table_name}" ({', '.join([f'"{col[0]}"' for col in primary_key_columns])}, added_ms, updated_ms, version)
-            VALUES ({', '.join(['NEW.' + f'"{col[0]}"' for col in primary_key_columns])}, {current_time_expr}, {current_time_expr}, {next_version_expr});
+            INSERT INTO "_chronicle_{table_name}" ({pks}, added_ms, updated_ms, version)
+            VALUES (
+                {', '.join(['NEW.' + f'"{col[0]}"' for col in primary_key_columns])},
+                {current_time_expr},
+                {current_time_expr},
+                {next_version_expr}
+            );
+            {increment_version_sql}
         END;
         """
         )
         c.execute(after_insert_sql)

+        # Before update trigger sets __chronicle_versions version to max(version)
+        c.execute(
+            textwrap.dedent(
+                f"""
+            CREATE TRIGGER "_chronicle_{table_name}_bu"
+            BEFORE UPDATE ON "{table_name}"
+            BEGIN
+                INSERT OR REPLACE INTO __chronicle_versions (table_name, version) 
+                VALUES ('{table_name}', coalesce ((SELECT MAX(version) FROM "{table_name}"), 1));
+            END;
+            """
+            )
+        )
+
         # Create the after update trigger
         c.execute(
-            f"""
+            textwrap.dedent(
+                f"""
             CREATE TRIGGER "_chronicle_{table_name}_au"
             AFTER UPDATE ON "{table_name}"
             FOR EACH ROW
             BEGIN
-                UPDATE "_chronicle_{table_name}"
-                SET updated_ms = {current_time_expr},
-                    version = {next_version_expr},
-                    {', '.join([f'"{col[0]}" = NEW."{col[0]}"' for col in primary_key_columns])}
+                -- Set version to version + 1
+                UPDATE "_chronicle_{table_name}" 
+                SET version = (SELECT version FROM __chronicle_versions WHERE table_name = '{table_name}') + 1
                 WHERE { ' AND '.join([f'"{col[0]}" = OLD."{col[0]}"' for col in primary_key_columns]) };
+                {increment_version_sql}
             END;
             """
+            )
         )

         # Create the after delete trigger
         c.execute(
-            f"""
+            textwrap.dedent(
+                f"""
             CREATE TRIGGER "_chronicle_{table_name}_ad"
             AFTER DELETE ON "{table_name}"
             FOR EACH ROW
@@ -114,8 +157,10 @@ def enable_chronicle(conn: sqlite3.Connection, table_name: str):
                     version = {next_version_expr},
                     deleted = 1
                 WHERE { ' AND '.join([f'"{col[0]}" = OLD."{col[0]}"' for col in primary_key_columns]) };
+                {increment_version_sql}
             END;
             """
+            )
         )

diff --git a/tests/test_sqlite_chronicle.py b/tests/test_sqlite_chronicle.py
index 375efb4..b47c6c4 100644
--- a/tests/test_sqlite_chronicle.py
+++ b/tests/test_sqlite_chronicle.py
@@ -28,20 +28,8 @@ def test_enable_chronicle(table_name, pks):
     assert db[chronicle_table].indexes[0].columns == ["version"]
     if pks == ["id"]:
         expected = [
-            {
-                "id": 1,
-                "added_ms": ANY,
-                "updated_ms": ANY,
-                "version": 1,
-                "deleted": 0,
-            },
-            {
-                "id": 2,
-                "added_ms": ANY,
-                "updated_ms": ANY,
-                "version": 1,
-                "deleted": 0,
-            },
+            {"id": 1, "added_ms": ANY, "updated_ms": ANY, "version": 1, "deleted": 0},
+            {"id": 2, "added_ms": ANY, "updated_ms": ANY, "version": 2, "deleted": 0},
         ]
     else:
         expected = [
@@ -58,7 +46,7 @@ def test_enable_chronicle(table_name, pks):
                 "name": "Pancakes",
                 "added_ms": ANY,
                 "updated_ms": ANY,
-                "version": 1,
+                "version": 2,
                 "deleted": 0,
             },
         ]
@@ -75,7 +63,7 @@ def test_enable_chronicle(table_name, pks):
             "id": 3,
             "added_ms": ANY,
             "updated_ms": ANY,
-            "version": 2,
+            "version": 3,
             "deleted": 0,
         }

@@ -85,7 +73,7 @@ def test_enable_chronicle(table_name, pks):
             "name": "Mango",
             "added_ms": ANY,
             "updated_ms": ANY,
-            "version": 2,
+            "version": 3,
             "deleted": 0,
         }

diff --git a/tests/test_updates_since.py b/tests/test_updates_since.py
index f820fdc..f952e21 100644
--- a/tests/test_updates_since.py
+++ b/tests/test_updates_since.py
@@ -66,7 +66,7 @@ def test_updates_since(db):
             pks=(2,),
             added_ms=ANY,
             updated_ms=ANY,
-            version=1,
+            version=2,
             row={
                 "id": 2,
                 "name": "The disappearance of the Amber Room",
@@ -83,7 +83,7 @@ def test_updates_since(db):
             pks=(3,),
             added_ms=ANY,
             updated_ms=ANY,
-            version=2,
+            version=3,
             row={"id": 3, "name": "The lost city of Atlantis", "year": "360 BC"},
             deleted=0,
         )
@@ -96,7 +96,7 @@ def test_updates_since(db):
             pks=(1,),
             added_ms=ANY,
             updated_ms=ANY,
-            version=3,
+            version=4,
             row={
                 "id": 1,
                 "name": "The fate of the crew on the Mary Celeste",
@@ -108,7 +108,7 @@ def test_updates_since(db):
             pks=(3,),
             added_ms=ANY,
             updated_ms=ANY,
-            version=4,
+            version=5,
             row={"id": 3, "name": "The lost city of Atlantis", "year": "unknown"},
             deleted=0,
         ),
@@ -121,7 +121,7 @@ def test_updates_since(db):
             pks=(2,),
             added_ms=ANY,
             updated_ms=ANY,
-            version=5,
+            version=6,
             row={"id": 2, "name": None, "year": None},
             deleted=1,
         )
simonw commented 12 months ago

OK, if the table has more rows than batch_size when you first call enable_chronicle() on it things work OK, because the initial populating code now uses a window function to assign incrementing integers.

But... it's possible that updating more than batch_size rows in a single go will still break things if those rows all end up assigned the same version number.

simonw commented 12 months ago

No, that seems to work OK - it looks like the update trigger is causing incremented version IDs. From a debug run:

 {'added_ms': 1701836570679,
  'deleted': 0,
  'id': 199,
  'updated_ms': 1701836570679,
  'version': 401},
 {'added_ms': 1701836570679,
  'deleted': 0,
  'id': 200,
  'updated_ms': 1701836570679,
  'version': 402}]

Adding a test to demonstrate this.