influxdata / docs-v2

InfluxData Documentation that covers InfluxDB Cloud, InfluxDB OSS 2.x, InfluxDB OSS 1.x, InfluxDB Enterprise, Telegraf, Chronograf, Kapacitor, and Flux.
https://docs.influxdata.com
MIT License
72 stars 266 forks source link

website code samples: Go Flight SQL #4941

Open jstirnaman opened 1 year ago

jstirnaman commented 1 year ago
package main

import (
    "context"
    "crypto/x509"
    "encoding/json"
    "fmt"
    "os"

    "github.com/apache/arrow/go/v12/arrow/flight/flightsql"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"
)

func dbQuery(ctx context.Context) error {
    url := os.Getenv("CLOUD_SERVERLESS_HOST") + ":443"
    token := os.Getenv("CLOUD_SERVERLESS_READ_WRITE_TOKEN")
    database := os.Getenv("CLOUD_SERVERLESS_BUCKET_NAME")
    org_id := ""

    sql := `SELECT DATE_BIN(INTERVAL '2 hours', time, '1970-01-01T00:00:00Z'::TIMESTAMP) AS time,
                            room,
                            selector_max(temp, time)['value'] AS 'max temp',
                            selector_min(temp, time)['value'] AS 'min temp',
                            avg(temp) AS 'average temp'
                        FROM home
                        GROUP BY DATE_BIN(INTERVAL '2 hours', time, '1970-01-01T00:00:00Z'::TIMESTAMP),
                            room
                        ORDER BY room, time`

    // Create a gRPC transport
    pool, err := x509.SystemCertPool()
    if err != nil {
        return fmt.Errorf("x509: %s", err)
    }
    transport := grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(pool, org_id))
    opts := []grpc.DialOption{
        transport,
    }

    // Create query client
    client, err := flightsql.NewClient(url, nil, nil, opts...)
    if err != nil {
        return fmt.Errorf("flightsql: %s", err)
    }

    ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Token "+token)
    ctx = metadata.AppendToOutgoingContext(ctx, "database", database)

    info, err := client.Execute(ctx, sql)
    if err != nil {
        return fmt.Errorf("flightsql flight info: %s", err)
    }
    reader, err := client.DoGet(ctx, info.Endpoint[0].Ticket)
    if err != nil {
        return fmt.Errorf("flightsql do get: %s", err)
    }

    // Print results as JSON
    for reader.Next() {
        record := reader.Record()
        b, err := json.MarshalIndent(record, "", "  ")
        if err != nil {
            return err
        }
        fmt.Println("RECORD BATCH")
        fmt.Println(string(b))

        if err := reader.Err(); err != nil {
            return fmt.Errorf("flightsql reader: %s", err)
        }
    }

    return nil
}

func main() {
    if err := dbQuery(context.Background()); err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }
}