cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.98k stars 3.79k forks source link

`select for update` with `skip locked` is slow #97135

Open pcsegal opened 1 year ago

pcsegal commented 1 year ago

Describe the problem

select for update with skip locked performs slowly when there is a high number of concurrent queries.

To Reproduce

To make the issue easier to reproduce, I am posting a Go example.

In this example, I am trying to model the following situation: there is a set of items, and a set of workers. Each worker can claim an item, and each item can be claimed by only one worker at a time. Once a worker claims an item, the claim expires after a fixed amount of time (e.g., 5 minutes). After the claim for a given item expires, other workers can claim that item.

The items table models the items. In order to claim an item, each worker a select for update ... skip locked query followed by an update query, in one transaction.

The sample code is below. It creates 1000 items and spins up 500 workers.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
)

func createTable(ctx context.Context, dbpool *pgxpool.Pool) error {
    createTableStmt := `
        create table if not exists items (
            item_id int64 primary key,
            worker_id int64 null,
            expires_at timestamp not null default timestamp 'epoch'
        );

        create index on items ("expires_at");
    `
    if _, err := dbpool.Exec(ctx, createTableStmt); err != nil {
        return fmt.Errorf("create tables: %w", err)
    }
    return nil
}

func insertItems(ctx context.Context, dbpool *pgxpool.Pool) error {
    _, err := dbpool.Exec(ctx, `delete from items`)
    if err != nil {
        return err
    }

    // Insert rows into items table.
    c := 1000
    var wg sync.WaitGroup
    wg.Add(c)
    for i := 0; i < c; i++ {
        i := i
        go func() {
            defer wg.Done()
            stmt := `insert into items(item_id) values ($1)`
            _, err := dbpool.Exec(ctx, stmt, i)
            if err != nil {
                panic(err)
            }
        }()
    }
    wg.Wait()
    return nil
}

func main() {
    pgURL := "postgresql://root@0.0.0.0:26257/testdb?sslmode=disable&pool_max_conns=200"
    var dbpool *pgxpool.Pool
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    pgPoolConfig, err := pgxpool.ParseConfig(pgURL)
    if err != nil {
        panic(err)
    }
    pgPoolConfig.MaxConns = int32(200)

    dbpool, err = pgxpool.ConnectConfig(ctx, pgPoolConfig)
    if err != nil {
        panic(err)
    }

    if err := createTable(ctx, dbpool); err != nil {
        panic(err)
    }

    // Clear items table and insert items.
    start := time.Now()
    if err := insertItems(ctx, dbpool); err != nil {
        panic(err)
    }
    fmt.Println("Inserted items in", time.Since(start))

    start = time.Now()

    var wg sync.WaitGroup

    // Launch concurrent workers. Each worker will claim one item.
    numWorkers := 500
    wg.Add(numWorkers)
    for i := 0; i < numWorkers; i++ {
        i := i
        go func() {
            defer wg.Done()
            tx, err := dbpool.Begin(ctx)
            if err != nil {
                panic(err)
            }
            defer tx.Rollback(ctx)

            var tid int64
            var t time.Time

            if err := tx.QueryRow(ctx, `
            select
                item_id, expires_at
            from items
            where expires_at < $1
            order by expires_at asc
            limit 1
            for update
            skip locked`, time.Now().UTC()).Scan(&tid, &t); err != nil {
                panic(err)
            }

            _, err = tx.Exec(ctx, `update items set worker_id=$1, expires_at=$2 where item_id=$3`, i, time.Now().UTC().Add(5*time.Minute), tid)
            if err != nil {
                panic(err)
            }

            if err := tx.Commit(ctx); err != nil {
                panic(err)
            }
        }()
    }
    wg.Wait()
    fmt.Println("Workers took", time.Since(start))
}

Observed behavior:

Running the above example, the output in my local machine was the following:

Inserted items in 277.690375ms
Workers took 2.082947129s

Additionally, the CockroachDB WebUI's "SQL Activity" tab shows the following:

Expected behavior:

I would have expected a lower mean latency, in the order of tens of milliseconds rather than hundreds of milliseconds.

Environment:

Jira issue: CRDB-24536

blathers-crl[bot] commented 1 year ago

Hello, I am Blathers. I am here to help you get the issue triaged.

Hoot - a bug! Though bugs are the bane of my existence, rest assured the wretched thing will get the best of care here.

I have CC'd a few people who may be able to assist you:

If we have not gotten back to your issue within a few business days, you can try the following:

:owl: Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

rytaft commented 1 year ago

Hi @pcsegal, thanks for the detailed reproduction steps! I can easily reproduce the issue you're describing.

I'm going to pass this over to the kv team, since looking at the trace for one of the queries taking over 300ms, it seems like all the time is spent "waiting to acquire write latch":

Screenshot 2023-02-22 at 7 16 20 PM

Here is the statement bundle I collected on my machine in case it's helpful. stmt-bundle-842282716587982849.zip

yuzefovich commented 5 months ago

cc @michae2 in case this has changed with the read committed work