marcboeker / go-duckdb

go-duckdb provides a database/sql driver for the DuckDB database engine.
MIT License
705 stars 105 forks source link

Queries fail w/ "context cancelled" if more than 1 go-routine are issueing queries to duckdb. [v1.5.3] #132

Closed msf closed 10 months ago

msf commented 11 months ago

First off, thank you for building this, enjoying playing with it..

I have two go routines separately submitting queries using the Query() API and very frequently but not always I hit this case where one of the two returns "context cancelled" after a few microseconds (example: elapsed=184.941µ). This even happens after wrapping all calls to conn.Query behind a lock:

mu.Lock()
res, err := db.Query(stmt)
mu.Unlock()

I'm using access_mode=READ_WRITE, version: v1.5.3

This is running in a multicore linux server, Go v1.21, Ubuntu 22.04.

Thanks in advance!

marcboeker commented 11 months ago

Hi @msf, could you please provide a complete test case, so that I can reproduce the issue. Thanks!

msf commented 11 months ago

This code below reproduces the bug.. (also available here: https://github.com/msf/go-duckdb/pull/1) It is possible that I'm just not understanding how I should use the library, but I definitely don't understand why I get these queries erroring w/ context cancelled... but only under concurrent workloads..

package main

import (
    "context"
    "database/sql"
    "database/sql/driver"
    "fmt"
    "log"
    "math/rand"
    "path/filepath"
    "strings"
    "testing"
    "time"

    "github.com/go-errors/errors"
    "github.com/marcboeker/go-duckdb"
    "golang.org/x/exp/slog"
)

type (
    Result struct {
        Columns     []string          `json:"columns,omitempty"`
        ColumnTypes []*sql.ColumnType `json:"column_types,omitempty"`
        Rows        []interface{}     `json:"rows,omitempty"`
        NextOffset  int               `json:"next_offset,omitempty"`
    }
)

type newdb struct {
    db  *sql.DB
    cfg DuckConfig
}

func (ndb *newdb) Close() error { return ndb.db.Close() }

type DuckConfig struct {
    Filename        string
    ParallelQueries bool
}

func NewDuckDB(cfg *DuckConfig) (*newdb, error) {
    ctx := context.Background()
    ctxConn, cancelConn := context.WithTimeout(ctx, 1*time.Second)
    defer cancelConn()
    absolutePath, err := filepath.Abs(cfg.Filename)
    if err != nil {
        return nil, err
    }
    connStr := fmt.Sprintf("%v?access_mode=READ_WRITE", absolutePath)
    connector, err := duckdb.NewConnector(connStr, func(execer driver.ExecerContext) error {
        bootQueries := []string{
            "INSTALL 'httpfs'",
            "LOAD 'httpfs'",
        }
        for _, qry := range bootQueries {
            _, err := execer.ExecContext(ctxConn, qry, nil)
            if err != nil {
                return err
            }
        }
        return nil
    })
    if err != nil {
        return nil, err
    }

    db := sql.OpenDB(connector)

    err = db.PingContext(ctxConn)
    if err != nil {
        return nil, err
    }

    setting := db.QueryRow("SELECT current_setting('access_mode')")
    var am string
    err = setting.Scan(&am)
    if err != nil {
        return nil, err
    }
    slog.Info("NewDB",
        "access_mode", am,
        "cfg", fmt.Sprintf("%+v", *cfg),
    )

    ndb := &newdb{
        db:  db,
        cfg: *cfg,
    }
    return ndb, err
}

func ConfigForTests() *DuckConfig {
    cfg := &DuckConfig{
        Filename: fmt.Sprintf("duckdb%v.db", rand.Int()),
    }
    return cfg
}

func (ndb *newdb) Query(ctx context.Context, sql string, res *Result) error {
    rows, err := ndb.db.QueryContext(ctx, sql)
    if err != nil {
        err = errors.Errorf("failed on Query(%v), err:%w", sql, err)
        return err
    }
    defer rows.Close()
    res.Columns, err = rows.Columns()
    if err != nil {
        return err
    }
    res.ColumnTypes, err = rows.ColumnTypes()
    if err != nil {
        return err
    }
    for rows.Next() {
        rowPointers := make([]interface{}, len(res.ColumnTypes))
        for i := range rowPointers {
            rowPointers[i] = new(interface{})
        }
        err = rows.Scan(rowPointers...)
        if err != nil {
            return err
        }
        res.Rows = append(res.Rows, rowPointers)
    }
    if err = rows.Err(); err != nil {
        return err
    }
    return err
}

func reproBug(b *testing.B, ndb *newdb) {
    slog.Info("starting test..")

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            ctx := context.TODO()
            res := &Result{
                Rows: make([]interface{}, 0),
            }
            var err error
            stmt := `SELECT * FROM lineitem LIMIT 100 offset 100;`
            err = ndb.Query(ctx, stmt, res)
            if err != nil {
                bugRepro := strings.Contains(err.Error(), "context canceled")
                slog.Error("REPRODUCED BUG?", "error", err, "bugRepro", bugRepro)
                if bugRepro {
                    b.FailNow()
                } else {
                    b.Errorf("test failed w/ unexpected error: %w", err)
                }
            }
        }
    })
    b.StopTimer()
    slog.Info("testing finished")
    b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "rps")
}

func setupDuckDB() *newdb {
    slog.Info("connecting to DB")
    ndb, err := NewDuckDB(ConfigForTests())
    if err != nil {
        log.Fatal(err)
    }

    slog.Info("creating tables")
    _, err = ndb.db.Exec(`CREATE TABLE IF NOT EXISTS lineitem AS SELECT * FROM 'https://shell.duckdb.org/data/tpch/0_01/parquet/lineitem.parquet' LIMIT 10000;`)
    if err != nil {
        log.Fatal(err)
    }
    return ndb
}

func main() {
    ndb := setupDuckDB()
    defer ndb.Close()
    time.Sleep(400 * time.Millisecond)
    testing.Benchmark(func(b *testing.B) {
        reproBug(b, ndb)
    })

}

go run repro.go should reproduce the problem. this is the go.mod:

module repro-duckdb-bug

go 1.21.4

require (
    github.com/go-errors/errors v1.5.1
    github.com/marcboeker/go-duckdb v1.5.4
    golang.org/x/exp v0.0.0-20231127185646-65229373498e
)

require github.com/mitchellh/mapstructure v1.5.0 // indirect
begelundmuller commented 10 months ago

@msf When you make concurrent queries, Go's database/sql package lazily initializes new connections for the pool it manages under the hood. It will call the callback you pass to duckdb.NewConnector when lazily initializing a new connection.

You're using the ctxConn created by NewDuckDB inside that callback. By the time the second concurrent query comes around, the ctxConn will have been cancelled, so the ExecContext inside the callback fails.

Try initializing a new context with timeout inside the callback (or just use context.Background()).

msf commented 10 months ago

thanks a lot @begelundmuller , this was my mistake, closing the issue :-)