DATA-DOG / go-txdb

Immutable transaction isolated sql driver for golang
Other
667 stars 48 forks source link

Savepoint does not exists if using several transactions #55

Open AlwxSin opened 1 year ago

AlwxSin commented 1 year ago

In my project we prepare several different transactions before executing and committing them

func MultipleTx() {
    db, _ := sql.Open("postgres", "dsn")

    txs := make([]*sql.Tx, 0)

    for _ = range []string{"p", "o", "s", "g", "e", "s"} {
        tx, _ := db.Begin()
        txs = append(txs, tx)
        tx.Exec("some query")
    }

    ...

    for _, tx := range txs {
        tx.Commit()
    }
}

It works in production, but fails in tests with error savepoint "tx_2" does not exist.

Here is test example

func TestPostgresMultipleTx(t *testing.T) {
    // make sure drivers are registered first
    driver := drivers()[1]
    db, err := sql.Open("psql_txdb", "multiple_tx")
    if err != nil {
        t.Fatalf("psql: failed to open a postgres connection, have you run 'make test'? err: %s", err)
    }
    defer db.Close()

    var count int

    tx, err := db.Begin()
    if err != nil {
        t.Fatalf(driver+": failed to begin transaction: %s", err)
    }
    otherTX, err := db.Begin()
    if err != nil {
        t.Fatalf(driver+": failed to begin transaction: %s", err)
    }

    {
        _, err = tx.Exec(`INSERT INTO users (username, email) VALUES('txdb', 'txdb@test2.com')`)
        if err != nil {
            t.Fatalf(driver+": failed to insert an user: %s", err)
        }
        err = tx.QueryRow("SELECT COUNT(id) FROM users").Scan(&count)
        if err != nil {
            t.Fatalf(driver+": failed to count users: %s", err)
        }
        if count != 4 {
            t.Fatalf(driver+": expected 4 users to be in database, but got %d", count)
        }

    }

    err = db.QueryRow("SELECT COUNT(id) FROM users").Scan(&count)
    if err != nil {
        t.Fatalf(driver+": failed to count users: %s", err)
    }
    if count != 4 {
        t.Fatalf(driver+": expected 4 users to be in database, but got %d", count)
    }

    {
        _, err = otherTX.Exec(`INSERT INTO users (username, email) VALUES('other txdb', 'other_txdb@test2.com')`)
        if err != nil {
            t.Fatalf(driver+": failed to insert an user: %s", err)
        }
        err = otherTX.QueryRow("SELECT COUNT(id) FROM users").Scan(&count)
        if err != nil {
            t.Fatalf(driver+": failed to count users: %s", err)
        }
        if count != 5 {
            t.Fatalf(driver+": expected 5 users to be in database, but got %d", count)
        }
    }

    if err = tx.Commit(); err != nil {
        t.Fatalf(driver+": failed to rollback transaction: %s", err)
    }
    if err = otherTX.Commit(); err != nil {
        t.Fatalf(driver+": failed to commit transaction: %s", err)
    }

    err = db.QueryRow("SELECT COUNT(id) FROM users").Scan(&count)
    if err != nil {
        t.Fatalf(driver+": failed to count users: %s", err)
    }
    if count != 5 {
        t.Fatalf(driver+": expected 5 users to be in database, but got %d", count)
    }
}
AlwxSin commented 1 year ago

I understand that we should collect statements before opening transactions, but there are a lot of data and we use .Prepare to send it to db.

RobbieZhao commented 1 year ago

It appears this is what happened

begin;
  savepoint tx_1;  -- First Begin() call creates a savepoint
  savepoint tx_2;  -- Second Begin() call create another savepoint
  release savepoint tx_1;  -- First Commit() call releases savepoint tx_1
  release savepoint tx_2;  -- Second Commit() call tries to release savepoint tx_2, errs
commit;

According to psql doc,

RELEASE SAVEPOINT releases the named savepoint and all active savepoints that were created after the named savepoint, and frees their resources.

So release savepoint tx_1 will release both tx_1 and tx_2. Then when the code tries to release tx_2, it errs saying it can't find tx_2, because it's already released.

Seems to me that you could avoid this issue if you don't have begin() and commit() calls intertwined:

// instead of
Begin()
Begin()
Commit()
Commit()
// do this
Begin()
Commit()
Begin()
Commit()

not sure if this is practical for you though :)

peterldowns commented 1 month ago

@AlwxSin My project pgtestdb handles opening multiple transactions before committing them as you'd expect, no issues. You don't need to worry about committing transactions in any specific order. Consider giving it a shot!

This example requires a postgres server available at postgres://postgres:password@localhost:5433, which you can do with Docker pretty easily:

# file: docker-compose.yml
version: "3.6"
services:
  testdb:
    image: postgres:15
    environment:
      POSTGRES_PASSWORD: password
    restart: unless-stopped
    volumes:
      # Uses a tmpfs volume to make tests extremely fast. The data in test
      # databases is not persisted across restarts, nor does it need to be.
      - type: tmpfs
        target: /var/lib/postgresql/data/
    command:
      - "postgres"
      - "-c"
      - "fsync=off"
      - "-c"
      - "shared_buffers=1024MB"
      - "-c"
      - "synchronous_commit=off"
      - "-c"
      - "full_page_writes=off"
      - "-c"
      - "log_statement=all"
      - "-c"
      - "max_connections=1000"
    ports:
      - "5433:5432"
-- file: 01_initial.sql
create table users (
    id bigint primary key generated always as identity,
    username text unique not null,
    email text unique not null
);
// file: main_test.go
package main_test

import (
    "database/sql"
    "embed"
    "fmt"
    "testing"

    _ "github.com/lib/pq" // "postgres" driver

    "github.com/peterldowns/pgmigrate"
    "github.com/peterldowns/pgtestdb"
    "github.com/peterldowns/pgtestdb/migrators/pgmigrator"
    "github.com/peterldowns/testy/assert"
)

// Embeds the migration file
//go:embed *.sql
var migrationsFS embed.FS

func TestPostgresMultipleTx(t *testing.T) {
    t.Parallel()
    logger := pgmigrate.NewTestLogger(t)
    pgm, err := pgmigrator.New(migrationsFS, pgmigrator.WithLogger(logger))
    assert.Nil(t, err)

    db := pgtestdb.New(t, pgtestdb.Config{
        DriverName: "postgres",
        Host:       "localhost",
        User:       "postgres",
        Password:   "password",
        Port:       "5433",
        Options:    "sslmode=disable",
    }, pgm)

    // Begin 6 transactions, each inserting a user, but don't commit.
    txs := make([]*sql.Tx, 0)
    for i, letter := range []string{"p", "o", "s", "g", "e", "s"} {
        tx, _ := db.Begin()
        txs = append(txs, tx)
        username := fmt.Sprintf("user-%d-%s", i, letter)
        email := fmt.Sprintf("%s@example-%d-%s.com", username, i, letter)
        _, err := tx.Exec(`INSERT INTO users (username, email) VALUES( $1, $2 )`, username, email)
        assert.Nil(t, err)
    }

    // Confirm that there are 0 users in the database.
    var beforeCommittedCount int
    err = db.QueryRow("SELECT COUNT(id) FROM users").Scan(&beforeCommittedCount)
    assert.Nil(t, err)
    assert.Equal(t, 0, beforeCommittedCount)

    // shuffle the array txs so the elements are now in random order, they can
    // be committed in any order without a problem.
    rand.Shuffle(len(txs), func(i, j int) { txs[i], txs[j] = txs[j], txs[i] })

    // Commit each of the transactions, one-by-one.
    for i, tx := range txs {
        err := tx.Commit()
        assert.Nil(t, err)

        var count int
        err = db.QueryRow("SELECT COUNT(id) FROM users").Scan(&count)
        assert.Nil(t, err)
        assert.Equal(t, i+1, count)
    }

    // Confirm the final count is 6 users in the database.
    var finalCount int
    err = db.QueryRow("SELECT COUNT(id) FROM users").Scan(&finalCount)
    assert.Nil(t, err)
    assert.Equal(t, 6, finalCount)
}