influxdata / influxdb-client-go

InfluxDB 2 Go Client
MIT License
609 stars 116 forks source link

Large queries and intermittent internet connectivity cause truncated dataset to be returned instead of error #398

Closed ToothlessDaydreamer closed 3 months ago

ToothlessDaydreamer commented 9 months ago

Specifications

Steps to reproduce

  1. Run the code below, substituting in bucket credentials for a bucket containing a large dataset, and use a query that will take a few seconds to run.
  2. After a few seconds, before the query completes, switch off internet connectivity.

Sample script:

package main

import (
    "context"
    "encoding/csv"
    "fmt"
    "log"
    "os"
    "strings"
    "time"

    influx "github.com/InfluxCommunity/influxdb3-go/influxdb3"
    "github.com/apache/arrow/go/v13/arrow"
)

func main() {
    var database string
    var token string
    var query string
    var language string
    var resolution string
    currentTime := time.Now()

    database = "databaseID"
    token = "databaseReadToken"
    query = `SELECT * FROM "queryTest" WHERE time >= 1708819200000ms and time <= 1709023793000ms ORDER BY time DESC`
    language = "influxql"
    resolution = "ms"

    url := "databaseURL"

    // Create a new client using an InfluxDB server base URL and an authentication token
    client, err := influx.New(influx.ClientConfig{
        Host:     url,
        Token:    token,
        Database: database,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Close client at the end and escalate error if present
    defer func(client *influx.Client) {
        err := client.Close()
        if err != nil {
            log.Fatal(err)
        }
    }(client)

    queryType = influx.InfluxQL

    options := influx.QueryOptions{QueryType: queryType}

    iterator, err := client.QueryWithOptions(context.Background(), &options, query)
    if err != nil {
        log.Fatal(err)
    }

    // Write data to file section
    var headerWritten = false
    dateTimeString := currentTime.Format("2006_01_02-15_04_05") // Example format: YYYY-MM-DD_HH-MM-SS

    csvFilePath := "query_" + dateTimeString + ".csv"
    file, err := os.Create(csvFilePath)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    writer := csv.NewWriter(file)
    defer writer.Flush()
    var header []string

    for iterator.Next() {
        // Iterate over query data row by row
        mapData := iterator.Value()

        if !headerWritten {
            for key := range mapData {
                header = append(header, key)
            }
            if err := writer.Write(header); err != nil {
                fmt.Println(csvFilePath)
                log.Fatal(err)
            }
            headerWritten = true
        }

        // Create an empty slice of empty strings with the same length as the header
        var row []string
        for i := 0; i < len(header); i++ {
            row = append(row, "")
        }

        // Fill in the row slice with values based on the header
        for colIndex, col := range header {
            // Retrieve the value from mapData if it exists
            if value, ok := mapData[col]; ok {
                if col == "time" {
                    switch resolution {
                    case "ms":
                        value = value.(arrow.Timestamp) / 1e6
                    case "us":
                        value = value.(arrow.Timestamp) / 1e3
                    }
                }
                if value == nil {
                    value = ""
                }
                row[colIndex] = fmt.Sprintf("%v", value)
            }
        }

        err := writer.Write(row)
        if err != nil {
            fmt.Println(csvFilePath)
            log.Fatal("Failed to write to CSV file, please re-run query")
        }

    }

    writer.Flush() // Ensure data is flushed before the end in normal operation
    fmt.Println(csvFilePath)

} 

Query used and screenshot of final field of CSV used to replicate bug:

SELECT * FROM "queryTest" WHERE time >= 1708819200000ms and time <= 1709023793000ms ORDER BY time DESC

image

Expected behavior

The script should produce a timeout error, or equivalent to inform the user that the connection was interrupted and the query wasn't completed. It should be similar to the behaviour from the Python InfluxDB3 library, which returns the error stack:

…
File "pyarrow\_flight.pyx", line 1081, in pyarrow._flight.FlightStreamReader.read_all
File "pyarrow\_flight.pyx", line 55, in pyarrow._flight.check_flight_status
Pyarrow._flight.FlightInternalError: Flight returned internal error, with message: unexpected EOF.
…

Actual behavior

The script completes without any errors, and the CSV file should be in the script folder. Check the last timestamp in the CSV, and observe that it does not match the last requested timestamp from the query.

Note that this also occurs when attempting to query a very large amount of data (~500MB). The function hangs for a while before eventually returning, but on testing only a partial dataset was returned about 30% of the time.

Additional info

No response

vlastahajek commented 3 months ago

@ToothlessDaydreamer, thanks for reporting the issue. This is the repository for the InfluxDB 2 Go client. I've copied the info to the InfluxDB 3 Go client repository. Track it there, please.