marcboeker / go-duckdb

go-duckdb provides a database/sql driver for the DuckDB database engine.
MIT License
623 stars 97 forks source link

How to manually commit the contents of the WAL file to the main database file before db closed? #179

Open coloraven opened 6 months ago

coloraven commented 6 months ago

How to manually commit the contents of the WAL file to the main database file while using the appender method for parallel insert data before db closed.

package main

import (
    "context"
    "database/sql/driver"
    "fmt"
    "sync"
    "time"

    "github.com/marcboeker/go-duckdb"
)

func main() {
    connector, err := duckdb.NewConnector("sgk.db", func(execer driver.ExecerContext) error {
        bootQueries := []string{
            "CREATE TABLE IF NOT EXISTS info (ids int,name varchar)",
        }

        for _, query := range bootQueries {
            // Use ExecContext instead of Exec
            _, err := execer.ExecContext(context.Background(), query, nil)
            if err != nil {
                return err
            }
        }
        return nil
    })
    if err != nil {
        fmt.Println(err)
    }
    // defer connector.Close()

    conn, err := connector.Connect(context.Background())
    if err != nil {
        fmt.Println(err)
    }
    // defer conn.Close()
    var wg sync.WaitGroup
    for i := int32(0); i < 10000; i++ {
        wg.Add(1)
        go func(i int32) {
            defer wg.Done()
            appender, err := duckdb.NewAppenderFromConn(conn, "", "info")
            if err != nil {
                fmt.Println(err)
            }
            defer appender.Close()
            err = appender.AppendRow(i, "d")
            if err != nil {
                fmt.Println(err)
            }
        }(i)
    }
    wg.Wait()
    fmt.Println("pause 3s then close conn")
    time.Sleep(3 * time.Second)
    conn.Close()
    fmt.Println("pause 3s then close connector")
    time.Sleep(3 * time.Second)
    connector.Close() // Then the contents of the WAL file commited to the main database file
    fmt.Println("pause 5s then exit program")
    time.Sleep(5 * time.Second)
}
marcboeker commented 6 months ago

You do not need to create an appender for every row you want to insert. Please create the appender beforehand and use this to append rows to it.

appender, err := duckdb.NewAppenderFromConn(conn, "", "info")
if err != nil {
    fmt.Println(err)
}
defer appender.Close()

var wg sync.WaitGroup
for i := int32(0); i < 10000; i++ {
    wg.Add(1)
    go func(i int32) {
        defer wg.Done()   
        if err := appender.AppendRow(i, "d"); err != nil {
            fmt.Println(err)
        }
    }(i)
}
wg.Wait()
coloraven commented 6 months ago

You do not need to create an appender for every row you want to insert. Please create the appender beforehand and use this to append rows to it.

appender, err := duckdb.NewAppenderFromConn(conn, "", "info")
if err != nil {
    fmt.Println(err)
}
defer appender.Close()

var wg sync.WaitGroup
for i := int32(0); i < 10000; i++ {
    wg.Add(1)
    go func(i int32) {
      defer wg.Done()   
      if err := appender.AppendRow(i, "d"); err != nil {
          fmt.Println(err)
      }
    }(i)
}
wg.Wait()

then the program will panic with

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x7ff785d49ff4]

goroutine 7 [running]:
github.com/marcboeker/go-duckdb.setPrimitive[...](0x38?, 0x0?, 0x0)
        C:/Users/white/go/pkg/mod/github.com/marcboeker/go-duckdb@v1.6.1/appender.go:452 +0x34
github.com/marcboeker/go-duckdb.initPrimitive[...].func1(0x7ff785e1b1c0?, 0x7ff785d7e560?, {0x7ff785d7e560?, 0x7ff785d58aa0?})
        C:/Users/white/go/pkg/mod/github.com/marcboeker/go-duckdb@v1.6.1/appender.go:252 +0x2f
github.com/marcboeker/go-duckdb.(*Appender).appendRowArray(0xc00013e090, {0xc000071f70?, 0x2, 0x0?})
        C:/Users/white/go/pkg/mod/github.com/marcboeker/go-duckdb@v1.6.1/appender.go:567 +0x1b2
github.com/marcboeker/go-duckdb.(*Appender).AppendRow(0xc00013e090, {0xc000071f70, 0x2, 0x2})
        C:/Users/white/go/pkg/mod/github.com/marcboeker/go-duckdb@v1.6.1/appender.go:209 +0xd5
main.main.func2(0x0?)
        C:/Users/white/Desktop/go-duckdb/main.go:50 +0xa7
created by main.main in goroutine 1
        C:/Users/white/Desktop/go-duckdb/main.go:48 +0x1c9

Thanks for your reply!

coloraven commented 6 months ago

The focus is on how to commit the contents of the WAL file to the main database file before closing the database connection. This is because when inserting a large amount of data (10+ GB CSV) into DuckDB, it is necessary to commit the contents of the WAL file to the main database file after inserting a certain amount of data, rather than committing all at once at the end. If the program exits unexpectedly, this approach prevents data loss.

marcboeker commented 6 months ago

Whats happens when you call appender.Flush()? Does that solve your problem? But normally the appended rows should automatically be flushed to disk.

coloraven commented 6 months ago

Whats happens when you call appender.Flush()? Does that solve your problem? But normally the appended rows should automatically be flushed to disk.

Unfortunately, neither appender.close nor appender.flush has been able to commit the data from the WAL file to the main database file. I also gave appender.flush a try, but it didn't seem to have any effect. Just to clarify, my concern is about committing data from the WAL file to the main database file, rather than from memory to disk.

michaelmdresser commented 6 months ago

@coloraven try CHECKPOINT; https://duckdb.org/docs/sql/statements/checkpoint.html

coloraven commented 5 months ago

@coloraven try CHECKPOINT; https://duckdb.org/docs/sql/statements/checkpoint.html

no effect,you can test yourself

package main

import (
    "context"
    "database/sql"
    "database/sql/driver"
    "fmt"
    "sync"
    "time"

    "github.com/marcboeker/go-duckdb"
)

func main() {
    connector, err := duckdb.NewConnector("test.db", func(execer driver.ExecerContext) error {
        bootQueries := []string{
            "CREATE TABLE IF NOT EXISTS info (ids int,name varchar)",
        }

        for _, query := range bootQueries {
            // Use ExecContext instead of Exec
            _, err := execer.ExecContext(context.Background(), query, nil)
            if err != nil {
                return err
            }
        }
        return nil
    })
    if err != nil {
        fmt.Println(err)
    }
    // defer connector.Close()

    conn, err := connector.Connect(context.Background())
    if err != nil {
        fmt.Println(err)
    }
    // defer conn.Close()
    var wg sync.WaitGroup
    for i := int32(0); i < 10000; i++ {
        wg.Add(1)
        go func(i int32) {
            defer wg.Done()
            appender, err := duckdb.NewAppenderFromConn(conn, "", "info")
            if err != nil {
                fmt.Println(err)
            }
            defer appender.Close()
            err = appender.AppendRow(i, "d")
            if err != nil {
                fmt.Println(err)
            }

            appender.Flush()
        }(i)
    }
    db := sql.OpenDB(connector)
    defer db.Close()
    fmt.Println("\n[Please observe the changes in the WAL file and database file before and after.]\n")
    fmt.Println("Pausing for 3 seconds, Then exec CHECKPOINT.\n")
    db.Exec("CHECKPOINT")
    wg.Wait()
    fmt.Println("Pausing for 3 seconds, Then close conn\n")
    time.Sleep(3 * time.Second)
    conn.Close()
    fmt.Println("Pausing for 3 seconds, Then close connector\n")
    time.Sleep(3 * time.Second)
    connector.Close() // Then the contents of the WAL file commited to the main database file
    fmt.Println("Pausing for 5 seconds, Then exit program\n")
    time.Sleep(5 * time.Second)
}

And I tested it in CLI, it effect.

CREATE TABLE IF NOT EXISTS info (
  ids INT,
  name VARCHAR
);

INSERT INTO info (ids, name)
SELECT
  gs,
  'Name_' || CAST(gs AS VARCHAR)
FROM
  generate_series(1, 10000) AS t(gs);
-- now the wal file size : 137kb
-- and the main db file size: 12kb
-- then we exec checkpoint
checkpoint;
-- after that 
-- the wal file size : 0kb
-- and the main db file size: 524kb

but if change the insert sql to:

INSERT INTO info (ids, name)
SELECT
  gs,
  'Name_' || CAST(gs AS VARCHAR)
FROM
  generate_series(1, 100000000) AS t(gs);

no need exec checkpoint ,the duckdb will sync data from wal to db file.

taniabogatsch commented 5 months ago

no need exec checkpoint ,the duckdb will sync data from wal to db file

When inserting a lot of data, DuckDB triggers AutomaticCheckpoint (there is a threshold on how much we write to the WAL). So even though you're not calling CHECKPOINT explicitly in your last example, there is still a CHECKPOINT happening. Because we only truncate the WAL after a successful checkpoint.

Your scenario sounds like we're not monitoring this automatic checkpoint threshold when using the Appender. Can you reproduce this in C with the C API? Then, you can directly file a reproduction issue/bug report to DuckDB. My hunch is that this is not an issue caused by go-duckdb. This example probably works in go-duckdb if you're iteratively running INSERT INTO without the appender, as you're mimicking the CLI behavior.