GoogleCloudPlatform / cloud-spanner-emulator

An open source emulator for Cloud Spanner.
Apache License 2.0
265 stars 43 forks source link

Inserts can fail for tables using Spanner generated sequence default values for primary key/uniquely constrained columns #179

Open drewthor opened 1 month ago

drewthor commented 1 month ago

The emulator appears to use a static absl::flat_hash_map for sequence tracking to get the next value; this data structure is not thread-safe, so as a race condition, inserts that rely on the next sequence value for the default value of a column that has a unique constraint (e.g. primary key) may fail.

e.g. with the following DDL and then DML executed concurrently there is a race condition that may error DDL

CREATE SEQUENCE IF NOT EXISTS test_id OPTIONS (sequence_kind='bit_reversed_positive');
CREATE TABLE IF NOT EXISTS test_sequence (
    id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE test_id)),
    value INT64 NOT NULL
) PRIMARY KEY(id);

DML

INSERT INTO test_sequence (value) VALUES (1), (2), (3), (4)

Error (as returned from https://github.com/googleapis/go-sql-spanner)

spanner: code = "AlreadyExists", desc = "Failed to insert row with primary key ({pk#id:7205759403792793600}) due to previously existing row

Note: This issue is isolated to just the emulator; Cloud Spanner does not have this issue.

drewthor commented 1 month ago

Here is my attempt at a minimally reproducible code snippet using Go

package main

import (
    "cloud.google.com/go/spanner"
    database "cloud.google.com/go/spanner/admin/database/apiv1"
    "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
    instance "cloud.google.com/go/spanner/admin/instance/apiv1"
    "cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
    "context"
    "database/sql"
    "errors"
    "fmt"
    _ "github.com/googleapis/go-sql-spanner"
    "google.golang.org/grpc/codes"
    "log/slog"
    "os"
    "os/signal"
    "sync"
)

func main() {
    err := run()
    if err != nil {
        fmt.Fprint(os.Stderr, err)
        os.Exit(1)
    }
}

func run() error {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    const (
        ProjectID, InstanceID, DatabaseID, DatabaseNameEscaped = "your-project-id", "test-instance", "test-db", "`test-db`"

        Database = "projects/" + ProjectID + "/instances/" + InstanceID + "/databases/" + DatabaseID
    )

    { // create a new instance
        admin, err := instance.NewInstanceAdminClient(ctx)
        if err != nil {
            return fmt.Errorf("failed to create instance admin: %w", err)
        }
        op, err := admin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
            Parent:     "projects/" + ProjectID,
            InstanceId: InstanceID,
        })
        if err != nil {
            if spanner.ErrCode(err) == codes.AlreadyExists {
                goto instanceReady
            }
            return fmt.Errorf("create instance failed: %w", err)
        }
        if _, err := op.Wait(ctx); err != nil {
            return fmt.Errorf("failed to wait instance creation: %w", err)
        }

    instanceReady:
        if err := admin.Close(); err != nil {
            return fmt.Errorf("failed to close instance admin: %w", err)
        }
    }

    { // create a database
        admin, err := database.NewDatabaseAdminClient(ctx)
        if err != nil {
            return fmt.Errorf("failed to create database admin: %w", err)
        }

        ddl, err := admin.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
            Parent:          "projects/" + ProjectID + "/instances/" + InstanceID,
            CreateStatement: "CREATE DATABASE " + DatabaseNameEscaped,
            ExtraStatements: []string{},
            DatabaseDialect: databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL,
        })
        if err != nil {
            if spanner.ErrCode(err) == codes.AlreadyExists {
                goto databaseReady
            }
            return fmt.Errorf("failed to create database: %w", err)
        }

        if _, err := ddl.Wait(ctx); err != nil {
            return fmt.Errorf("failed to wait database creation: %w", err)
        }

    databaseReady:
        updateReq := &databasepb.UpdateDatabaseDdlRequest{
            Database: Database,
            Statements: []string{
                `CREATE SEQUENCE IF NOT EXISTS test_id OPTIONS (sequence_kind='bit_reversed_positive')`,
                `CREATE TABLE IF NOT EXISTS test_sequence (
                id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE test_id)),
                value INT64 NOT NULL
            ) PRIMARY KEY(id)`,
            },
        }

        updateDDL, err := admin.UpdateDatabaseDdl(ctx, updateReq)
        if err != nil {
            return fmt.Errorf("failed to update database ddl: %w", err)
        }

        if err := updateDDL.Wait(ctx); err != nil {
            return fmt.Errorf("failed to wait for database ddl: %w", err)
        }

        if err := admin.Close(); err != nil {
            return fmt.Errorf("failed to close database admin: %w", err)
        }

        db, err := openDatabaseConn(ctx, Database)
        if err != nil {
            return fmt.Errorf("failed to open database connection: %w", err)
        }
        defer db.Close()

        numVals := 100000
        numWorkers := 100

        worker := func(ctx context.Context, wg *sync.WaitGroup, valChan <-chan int, errorChan chan<- error) {
            defer wg.Done()
            for val := range valChan {
                if _, err := db.ExecContext(ctx, "INSERT INTO test_sequence (value) VALUES (?)", val); err != nil {
                    if !errors.Is(err, context.Canceled) {
                        errorChan <- err
                    }
                }
            }
        }

        wg := &sync.WaitGroup{}
        wg.Add(numWorkers)

        valChan := make(chan int, numVals)
        errorChan := make(chan error)

        for i := 0; i < numWorkers; i++ {
            go worker(ctx, wg, valChan, errorChan)
        }

        for i := 0; i < numVals; i++ {
            valChan <- i
        }
        close(valChan)

        go func() {
            wg.Wait()
            close(errorChan)
        }()

        for err := range errorChan {
            slog.Error("could not insert value", slog.Any("err", err.Error()))
        }

        return nil
    }
}

func openDatabaseConn(ctx context.Context, databaseConn string) (*sql.DB, error) {
    db, err := sql.Open("spanner", databaseConn)
    if err != nil {
        return nil, fmt.Errorf("failed to open connection to db: %w", err)
    }

    return db, nil
}
olavloite commented 1 month ago

A fix for this problem will be included in the next emulator release.

drewthor commented 1 week ago

@olavloite testing this on the latest emulator (both the docker image and the latest binary), we are still seeing frequent AlreadyExists errors. err="spanner: code = \"AlreadyExists\", desc = \"Failed to insert row with primary key ({pk#id:5728578726015270912}) due to previously existing row\""

drewthor commented 1 week ago

The AlreadyExists errors get more and more frequent on new database connections.

Here is a bit of an improved replication code (run it several times, 2 or 3 should do it, and the errors get more and more frequent): Note: testing with more than 2 workers causes the database to lock up.

package main

import (
    "cloud.google.com/go/spanner"
    database "cloud.google.com/go/spanner/admin/database/apiv1"
    "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
    instance "cloud.google.com/go/spanner/admin/instance/apiv1"
    "cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
    "context"
    "database/sql"
    "errors"
    "fmt"
    _ "github.com/googleapis/go-sql-spanner"
    "google.golang.org/grpc/codes"
    "log/slog"
    "os"
    "os/signal"
    "sync"
    "time"
)

func main() {
    err := runSequence()
    if err != nil {
        fmt.Fprint(os.Stderr, err)
        os.Exit(1)
    }
}

func runSequence() error {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
    defer cancel()

    const (
        ProjectID, InstanceID, DatabaseName = "your-project-id", "test-instance", "test-db"

        Database = "projects/" + ProjectID + "/instances/" + InstanceID + "/databases/" + DatabaseName
    )

    if err := createInstanceAndDatabase(ctx, ProjectID, InstanceID, DatabaseName); err != nil {
        return fmt.Errorf("failed to create instance and database: %w", err)
    }

    db, err := openDatabaseConnSequence(ctx, "projects/"+ProjectID+"/instances/"+InstanceID+"/databases/"+DatabaseName)
    if err != nil {
        return fmt.Errorf("failed to open database connection: %w", err)
    }
    defer db.Close()

    numVals := 10000
    numWorkers := 2

    worker := func(ctx context.Context, wg *sync.WaitGroup, valChan <-chan int, errorChan chan<- error) {
        defer wg.Done()
        for val := range valChan {
            valCtx, valCancel := context.WithTimeout(ctx, 3*time.Second)
            var id int64
            tx, err := db.BeginTx(valCtx, nil)
            if err != nil {
                errorChan <- fmt.Errorf("failed to start transaction: %w", err)
                valCancel()
                continue
            }
            if err := tx.QueryRowContext(valCtx, "INSERT INTO test_sequence (value) VALUES (?) THEN RETURN id", val).Scan(&id); err != nil {
                if !errors.Is(err, context.Canceled) {
                    errorChan <- err
                    tx.Rollback()
                    valCancel()
                    continue
                }
            }
            tx.Commit()
            fmt.Println("inserted sequence id:", id)
            valCancel()
        }
    }

    wg := &sync.WaitGroup{}
    wg.Add(numWorkers)

    valChan := make(chan int, numVals)
    errorChan := make(chan error)

    for i := 0; i < numWorkers; i++ {
        go worker(ctx, wg, valChan, errorChan)
    }

    for i := 0; i < numVals; i++ {
        valChan <- i
    }
    close(valChan)

    go func() {
        wg.Wait()
        close(errorChan)
    }()

    for err := range errorChan {
        slog.Error("could not insert value", slog.Any("err", err.Error()))
    }

    return nil
}

func createInstanceAndDatabase(ctx context.Context, projectID, instanceID, databaseName string) error {
    { // create a new instance
        admin, err := instance.NewInstanceAdminClient(ctx)
        if err != nil {
            return fmt.Errorf("failed to create instance admin: %w", err)
        }
        op, err := admin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
            Parent:     "projects/" + projectID,
            InstanceId: instanceID,
        })
        if err != nil {
            if spanner.ErrCode(err) == codes.AlreadyExists {
                goto instanceReady
            }
            admin.Close()
            return fmt.Errorf("create instance failed: %w", err)
        }
        if _, err := op.Wait(ctx); err != nil {
            admin.Close()
            return fmt.Errorf("failed to wait instance creation: %w", err)
        }

    instanceReady:
        if err := admin.Close(); err != nil {
            return fmt.Errorf("failed to close instance admin: %w", err)
        }
    }

    { // create a database
        admin, err := database.NewDatabaseAdminClient(ctx)
        if err != nil {
            return fmt.Errorf("failed to create database admin: %w", err)
        }

        ddl, err := admin.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
            Parent:          "projects/" + projectID + "/instances/" + instanceID,
            CreateStatement: "CREATE DATABASE " + "`" + databaseName + "`",
            ExtraStatements: []string{},
            DatabaseDialect: databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL,
        })
        if err != nil {
            if spanner.ErrCode(err) == codes.AlreadyExists {
                goto databaseReady
            }
            admin.Close()
            return fmt.Errorf("failed to create database: %w", err)
        }

        if _, err := ddl.Wait(ctx); err != nil {
            admin.Close()
            return fmt.Errorf("failed to wait database creation: %w", err)
        }

    databaseReady:
        updateReq := &databasepb.UpdateDatabaseDdlRequest{
            Database: "projects/" + projectID + "/instances/" + instanceID + "/databases/" + databaseName,
            Statements: []string{
                `CREATE SEQUENCE IF NOT EXISTS test_id OPTIONS (sequence_kind='bit_reversed_positive')`,
                `CREATE TABLE IF NOT EXISTS test_sequence (
                id INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE test_id)),
                value INT64 NOT NULL
            ) PRIMARY KEY(id)`,
            },
        }

        updateDDL, err := admin.UpdateDatabaseDdl(ctx, updateReq)
        if err != nil {
            admin.Close()
            return fmt.Errorf("failed to update database ddl: %w", err)
        }
        if err := updateDDL.Wait(ctx); err != nil {
            admin.Close()
            return fmt.Errorf("failed to wait for database ddl: %w", err)
        }

        if err := admin.Close(); err != nil {
            return fmt.Errorf("failed to close database admin: %w", err)
        }

    }
    return nil
}

func openDatabaseConnSequence(ctx context.Context, databaseConn string) (*sql.DB, error) {
    db, err := sql.Open("spanner", databaseConn)
    if err != nil {
        return nil, fmt.Errorf("failed to open connection to db: %w", err)
    }

    return db, nil
}
hengfengli commented 1 week ago

Hi @drewthor, thanks a lot for providing the reproducing code. With your code, we have identified the root cause. This is not a concurrency issue. We have a bug that when the sequence test_id already exists, calling CREATE SEQUENCE IF NOT EXISTS test_id OPTIONS (sequence_kind='bit_reversed_positive') would cause a deletion of the recorded last value (the state) for the sequence test_id. Therefore, if you do:

// Update DDL (reset the last value to the default value)
CREATE SEQUENCE IF NOT EXISTS test_id OPTIONS (sequence_kind='bit_reversed_positive')

// Inserts
insert data

Every time when you re-run the program, it resets the recorded value to 1 and then, starts inserting data. If you can avoid updating DDL every time (only calling CREATE SEQUENCE once), it should avoid the above error. In the meanwhile, we are working on a fix to this issue.