marcboeker / go-duckdb

go-duckdb provides a database/sql driver for the DuckDB database engine.
MIT License
646 stars 97 forks source link

interrupt query on ctx cancel/timeout #143

Closed k-anshul closed 8 months ago

k-anshul commented 8 months ago

It seems we need to interrupt the query as well when context is cancelled or timed out to properly stop the query. Ref : https://discord.com/channels/909674491309850675/921073327009853451/1087993635997491241

k-anshul commented 8 months ago

Consider the below code snippet where I only complete a single task for a pending result and than destroy pending to mock query cancellation i.e. call the same set of APIs that gets called in the driver during query cancellation.

#include "duckdb.h"
#include "stdio.h"
#include "string.h"

void REQUIRE(bool input) {
    if (!input) {
        printf("FAILED\n");
        exit(1);
    }
}

int main() {
    const char *file_path = "db_test.db";
    remove(file_path);
    duckdb_database database;
    duckdb_connection connection;
    REQUIRE(duckdb_open_ext(file_path, &database, NULL, NULL) == DuckDBSuccess);
    REQUIRE(duckdb_connect(database, &connection) == DuckDBSuccess);

    duckdb_prepared_statement prepared;
    REQUIRE(duckdb_prepare(connection, "CREATE OR REPLACE TABLE test AS SELECT * FROM read_parquet('~/large_data/data/**/*.parquet')", &prepared) == DuckDBSuccess);

    duckdb_pending_result pendingRes;
    REQUIRE(duckdb_pending_prepared(prepared, &pendingRes) == DuckDBSuccess);

    duckdb_pending_state state = duckdb_pending_execute_task(pendingRes);
    REQUIRE(state != DUCKDB_PENDING_ERROR);

    // duckdb_interrupt(connection);

    printf("duckdb_destroy_pending\n");
    duckdb_destroy_pending(&pendingRes);

    printf("duckdb_destroy_prepare\n");
    duckdb_destroy_prepare(&prepared);

    printf("duckdb_disconnect\n");
    duckdb_disconnect(&connection);

    printf("duckdb_close\n");
    duckdb_close(&database);
}

I am seeing that duckdb_disconnect only returns when duckDB has processed entire query(looking at the size of the db file and the time it takes to return). If I uncomment duckdb_interrupt(connection); the behaviour is as expected.

marcboeker commented 8 months ago

@k-anshul Great find, thanks for the PR!

marcboeker commented 8 months ago

@k-anshul Hm, tests passed on Github Actions but on my Mac the TestQueryTimeout indefinitely hangs. It seems the -race -count=1 introduces the problem. I'll try to debug this, maybe you have an idea?

Update: Has nothing to do with -race.

marcboeker commented 8 months ago

@k-anshul It's getting weirder:

func main() {
    var err error
    db, err = sql.Open("duckdb", "?access_mode=READ_WRITE")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*250)
    defer cancel()

    _, err = db.ExecContext(ctx, "CREATE TABLE test AS SELECT * FROM range(10000000) t1, range(1000000) t2;")
}

Never finishes. But adding a time.Sleep(time.Millisecond) to the for loop, fixes the problem.

for {
    select {
    // if context is cancelled or deadline exceeded, don't execute further
    case <-ctx.Done():
        fmt.Println("context cancelled", ctx.Err())
        // also need to interrupt to cancel the query
        C.duckdb_interrupt(*s.c.con)
        return nil, ctx.Err()
    default:
        // continue
        time.Sleep(time.Millisecond)
    }

    state := C.duckdb_pending_execute_task(pendingRes)
    if state == C.DUCKDB_PENDING_ERROR {
        dbErr := C.GoString(C.duckdb_pending_error(pendingRes))
        return nil, errors.New(dbErr)
    }
    if C.duckdb_pending_execution_is_finished(state) {
        break
    }
}

Can you reproduce the problem?

marcboeker commented 8 months ago

@k-anshul It seems that the for loop, waiting for either a context cancel or duckdb_pending_execution_is_finished polls the duckdb_pending_execute_task too often, which then starts to block. The sleep prevents the for loop from looping too often and reduces the CPU load, which then gives the duckdb_pending_execute_task enough time to finish.

I'm not sure yet if a time.Sleep(time.Millisecond) is the proper approach to solve this.

k-anshul commented 8 months ago

Hey @marcboeker

I added some logs to debug this further:

    iter := 0
    t := time.Now()
    for ; ; iter++ {
        select {
        // if context is cancelled or deadline exceeded, don't execute further
        case <-ctx.Done():
            fmt.Printf("Total iterations %v, last iteration at %v, now %v\n", iter, t.Unix(), time.Now().Unix())
            // also need to interrupt to cancel the query
            C.duckdb_interrupt(*s.c.con)
            return nil, ctx.Err()
        default:
            t = time.Now()
            // continue
        }
        state := C.duckdb_pending_execute_task(pendingRes)
        if state == C.DUCKDB_PENDING_ERROR {
            dbErr := C.GoString(C.duckdb_pending_error(pendingRes))
            return nil, errors.New(dbErr)
        }
        if C.duckdb_pending_execution_is_finished(state) {
            break
        }
    }

I see that when it takes very long for this query to get cancelled, duckDB runs very few tasks with last task taking much longer to run: Total iterations 2, last iteration at 1705052041, now 1705052054 whereas when it gets killed fast it is running very high number of tasks Total iterations 1207559, last iteration at 1705052160, now 1705052160

Ideally duckDB need to return fast within tasks but not sure why it isn't happening here.

Another approach we can take here is to wait on ctx in a separate goroutine and call duckdb_interrupt from that (which is also thread safe).

        done := make(chan bool)
    defer close(done)

    now := time.Now()
    go func() {
        select {
        case <-ctx.Done():
            // also need to interrupt to cancel the query
            C.duckdb_interrupt(*s.c.con)
            return
        case <-done:
            return
        }
    }()

    var res C.duckdb_result
    if state := C.duckdb_execute_pending(pendingRes, &res); state == C.DuckDBError {
        if ctx.Err() != nil {
            fmt.Printf("interrupted in %v milliseconds\n", time.Since(now).Milliseconds())
            return nil, ctx.Err()
        }

        dbErr := C.GoString(C.duckdb_result_error(&res))
        C.duckdb_destroy_result(&res)
        return nil, errors.New(dbErr)
    }
    return &res, nil

This avoids the for loop problem but has an additional overhead of a separate goroutine which also calls a C API. Based on my limited testing this seems to return in expected cancellation time as compared to first approach. What do you think about this approach ? I am also discussing the possible solutions for query cancellations with duckDB team.

marcboeker commented 8 months ago

Hi @k-anshul thanks for your suggestion with the goroutine. Without the for-loop we're missing the check for any pending tasks. Or have I missed something?

state := C.duckdb_pending_execute_task(pendingRes)
if state == C.DUCKDB_PENDING_ERROR {
    dbErr := C.GoString(C.duckdb_pending_error(pendingRes))
    return nil, errors.New(dbErr)
}
if C.duckdb_pending_execution_is_finished(state) {
    break
}
k-anshul commented 8 months ago

Hey @marcboeker

So as I understand we can either call duckdb_pending_execute_task which executes one task under query and then call duckdb_pending_execution_is_finished to check if there are more tasks pending or duckdb_execute_pending which fully executes the pending query.

marcboeker commented 8 months ago

@k-anshul Ah okay, so we're switching from executing the query step by step in little tasks to executing it at once using duckdb_execute_pending?

Which means that getting rid of the incremental execution should also speed things up?

k-anshul commented 8 months ago

@k-anshul Ah okay, so we're switching from executing the query step by step in little tasks to executing it at once using duckdb_execute_pending?

Yes @marcboeker.

Which means that getting rid of the incremental execution should also speed things up?

It avoids looping and continuously checking the status via C APIs but adds an an overhead of making a CGO call in a separate goroutine which can also spawn more thread(although as I understand duckdb_interrupt should return fast since it just sets a flag atomically) so both has pros and cons. But yeah overall feels this is a much better approach.

marcboeker commented 8 months ago

@k-anshul Okay great, I've changed it in main and will make a release shortly.

Thanks for debugging this!

k-anshul commented 8 months ago

@k-anshul Okay great, I've changed it in main and will make a release shortly.

Thanks for debugging this!

Should we also wait for duckDB team's inputs in case we are missing something here ?

marcboeker commented 8 months ago

@k-anshul Yeah, probably we should do this 🙂

Hi @taniabogatsch could you please do us a favour and check this your colleagues if our approach on only using duckdb_execute_pending instead of running and checking the result of duckdb_pending_execute_task in a loop is valid? Thank you 🙂

k-anshul commented 8 months ago

Hey @marcboeker

We got the following reply from duckDB team :

When you open a pending query result, background threads will already start working on the actual query. The task execution of the pending query result through duckdb_pending_execute_task only concerns the main thread - the background threads will work on the query continuously. duckdb_interrupt actually interrupts a query and cancels work done by all background threads. When you close a database, the system waits for all background threads to complete before shutting down.

I do agree that it would make sense for duckdb_destroy_pending and duckdb_disconnect to call duckdb_interrupt internally to prevent having to wait for a query whose result will not be used anyway, however. Currently that does not happen and the user needs to call duckdb_interrupt manually.

Based on this input I think its better to call duckdb_interrupt from a background go-routine. I am also testing the changes within our application. I will update you here.

Thanks

marcboeker commented 8 months ago

@k-anshul Thanks for the update!