ClickHouse / clickhouse-go

Golang driver for ClickHouse
Apache License 2.0
2.87k stars 550 forks source link

There's has no way to cancel a running query using context's cancel functions. #1388

Open tinybit opened 3 weeks ago

tinybit commented 3 weeks ago

Observed

Suppose we have a query that takes more than 10s to run.

Example 1: query will timeout after 10s, even though cancel() is called after 5s.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

go func() {
   <...>
   err = conn.Exec(ctx, query)
   <...>
}

go func() {
    time.Sleep(5 * time.Second)
    cancel()
}()

Example 2: query will timeout after 5m, even though cancel() is called after 5s.

ctx, cancel := context.WithCancel(context.Background())

go func() {
   <...>
   err = conn.Exec(ctx, query)
   <...>
}

go func() {
    time.Sleep(5 * time.Second)
    cancel()
}()

There's full code example below. It outputs:

2024/08/29 15:51:19 Connected.
2024/08/29 15:51:26 Running heavy query...
2024/08/29 15:51:27 Context cancelled after 1s.
2024/08/29 15:51:32 Query took: 6.051290167s
2024/08/29 15:51:32 Done.

Note, that query execution takes 6+ seconds, when it should be around 1s, as per context cancellation request.

Expected behaviour

Stop query execution immediately after context was cancelled.

Results after patch applied, note difference 1s vs 1.001692375s:

2024/08/29 15:58:16 Connected.
2024/08/29 15:58:22 Running heavy query...
2024/08/29 15:58:23 Context cancelled after 1s.
2024/08/29 15:58:23 Finished with error: context canceled
2024/08/29 15:58:23 Query took: 1.001692375s
2024/08/29 15:58:23 Done.

Code example

package main

import (
    "context"
    "crypto/tls"
    "log"
    "time"

    "github.com/ClickHouse/clickhouse-go/v2"
)

var (
    q1 = "CREATE DATABASE IF NOT EXISTS test_query_cancellation"
    q2 = "DROP TABLE IF EXISTS test_query_cancellation.trips"
    q3 = `CREATE TABLE test_query_cancellation.trips (
        trip_id             UInt32,
        pickup_datetime     DateTime,
        dropoff_datetime    DateTime,
        pickup_longitude    Nullable(Float64),
        pickup_latitude     Nullable(Float64),
        dropoff_longitude   Nullable(Float64),
        dropoff_latitude    Nullable(Float64),
        passenger_count     UInt8,
        trip_distance       Float32,
        fare_amount         Float32,
        extra               Float32,
        tip_amount          Float32,
        tolls_amount        Float32,
        total_amount        Float32,
        payment_type        Enum('CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4, 'UNK' = 5),
        pickup_ntaname      LowCardinality(String),
        dropoff_ntaname     LowCardinality(String)
    )
    ENGINE = MergeTree
    PRIMARY KEY (pickup_datetime, dropoff_datetime);`
    q4 = `INSERT INTO test_query_cancellation.trips
    SELECT
        trip_id,
        pickup_datetime,
        dropoff_datetime,
        pickup_longitude,
        pickup_latitude,
        dropoff_longitude,
        dropoff_latitude,
        passenger_count,
        trip_distance,
        fare_amount,
        extra,
        tip_amount,
        tolls_amount,
        total_amount,
        payment_type,
        pickup_ntaname,
        dropoff_ntaname
    FROM s3(
        'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{0..2}.gz',
        'TabSeparatedWithNames'
    );`
)

func main() {
    prepareQueries := []string{q1, q2, q3, q4}

    // connect
    options := &clickhouse.Options{
        Addr: []string{"o2q8185ekk.us-east-2.aws.clickhouse-staging.com:9440"},
        Auth: clickhouse.Auth{
            Database: "default",
            Username: "default",
            Password: "pg0H6o0ikVyJFbu9KgYKXTII1u5HHdLLsvCg18MI7rSsf4ukLFUZcNzMoMjFfQaY",
        },
        Compression: &clickhouse.Compression{
            Method: clickhouse.CompressionLZ4,
        },
        TLS: &tls.Config{
            InsecureSkipVerify: false,
        },
        MaxOpenConns: 1,
        MaxIdleConns: 1,
    }

    conn, err := clickhouse.Open(options)
    if err != nil {
        return
    }

    if err = conn.Ping(context.Background()); err != nil {
        return
    }

    log.Println("Connected.")

    // prepare table
    for _, query := range prepareQueries {
        err = conn.Exec(context.Background(), query)
        if err != nil {
            log.Printf("Finished with error: %v\n", err)
            conn.Close()
            return
        }
    }

    // prepare context
    ctx, cancelCtx := context.WithCancel(context.Background())
    defer cancelCtx()

    doneCh := make(chan bool, 1)

    // run query in background
    go func() {
        log.Println("Running heavy query...")

        start := time.Now()

        defer func() {
            log.Printf("Query took: %v\n", time.Since(start))
            doneCh <- true
        }()

        err = conn.Exec(ctx, "OPTIMIZE TABLE test_query_cancellation.trips FINAL")
        if err != nil {
            log.Printf("Finished with error: %v\n", err)
            return
        }
    }()

    // let workers run for awhile and stop
    go func() {
        cancelBackoff := 1 * time.Second
        time.Sleep(cancelBackoff)
        cancelCtx()
        log.Printf("Context cancelled after %v.", cancelBackoff)
    }()

    <-doneCh

    conn.Close()
    log.Println("Done.")
}

Details

Environment

zdyj3170101136 commented 5 days ago

Before the modification, clickhouse.conn and *sql.DB could not control the timeout through context.Cancel?