influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

Output plugin that supports sending metrics into Google CloudRun #7913

Closed raider111111 closed 2 years ago

raider111111 commented 4 years ago

Feature Request

Opening a feature request kicks off a discussion.

Proposal:

An output plugin that supports sending metrics into a Wavefront proxy located in the Google CloudRun environment including handling the access token handshakes.

Current behavior:

Not aware that this is available.

Desired behavior:

An output plugin that supports sending metrics into a Wavefront proxy located in the Google CloudRun environment including handling the access token handshakes.

Use case:

This doesn't appear to be available from Telegraf. In order to send metrics into the CloudRun Wavefont proxy you need to write custom code to handle the handshakes

raider111111 commented 4 years ago

For reference, This is what I'm using now:

package main

import (
    "crypto/rsa"
    "crypto/x509"
    "encoding/json"
    "encoding/pem"
    "errors"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "net/url"
    "time"

    "cloud.google.com/go/compute/metadata"
    "golang.org/x/oauth2/google"
    "golang.org/x/oauth2/jws"
)

// makeGetRequest makes a GET request to the specified Cloud Run endpoint in
// serviceURL (must be a complete URL) by authenticating with the ID token
// obtained from the Metadata API.
func makeGetRequest(serviceURL string) (*http.Response, error) {
    // query the id_token with ?audience as the serviceURL
    tokenURL := fmt.Sprintf("/instance/service-accounts/default/identity?audience=%s", serviceURL)
    idToken, err := metadata.Get(tokenURL)
    if err != nil {
        return nil, fmt.Errorf("metadata.Get: failed to query id_token: %+v", err)
    }
    req, err := http.NewRequest("GET", serviceURL, nil)
    if err != nil {
        return nil, err
    }
    req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", idToken))
    return http.DefaultClient.Do(req)
}

func generateJWT(saKeyfile, saEmail, audience string, expiryLength int64) (string, error) {
    now := time.Now().Unix()
    gcpauth := "https://www.googleapis.com/oauth2/v4/token"
    // Build the JWT payload.

    jwt := &jws.ClaimSet{
        Iat: now,
        // expires after 'expiraryLength' seconds.
        Exp: now + expiryLength,
        // Iss must match 'issuer' in the security configuration in your
        // swagger spec (e.g. service account email). It can be any string.
        Iss: saEmail,
        // Aud must be either your Endpoints service name, or match the value
        // specified as the 'x-google-audience' in the OpenAPI document.
        Aud: gcpauth,
        // Sub and Email should match the service account's email address.
        Sub:           saEmail,
        PrivateClaims: map[string]interface{}{"target_audience": audience},
    }
    jwsHeader := &jws.Header{
        Algorithm: "RS256",
        Typ:       "JWT",
    }

    // Extract the RSA private key from the service account keyfile.
    sa, err := ioutil.ReadFile(saKeyfile)
    if err != nil {
        return "", fmt.Errorf("Could not read service account file: %v", err)
    }
    conf, err := google.JWTConfigFromJSON(sa)
    if err != nil {
        return "", fmt.Errorf("Could not parse service account JSON: %v", err)
    }

    block, _ := pem.Decode(conf.PrivateKey)
    parsedKey, err := x509.ParsePKCS8PrivateKey(block.Bytes)
    if err != nil {
        return "", fmt.Errorf("private key parse error: %v", err)
    }

    rsaKey, ok := parsedKey.(*rsa.PrivateKey)
    // Sign the JWT with the service account's private key.
    if !ok {
        return "", errors.New("private key failed rsa.PrivateKey type assertion")
    }

    return jws.Encode(jwsHeader, jwt, rsaKey)
}

func getGoogleID(jwtToken string) (string, error) {
    var accessToken GoogleID
    googleidurl := "https://www.googleapis.com/oauth2/v4/token"
    responseBody, err := callAPIEndpoint("POST", googleidurl, jwtToken, nil)
    if err != nil {
        return "", err
    }
    err = json.Unmarshal(responseBody, &accessToken)
    if err != nil {
        return "", err
    }
    return accessToken.Token, err
}
func main() {
    token, err := generateJWT("PATH_TO_dev-svc-wf-proxy-cr.json", "CLOUD_RUN_SERVICE_ACCOUNT",
        "URL_TO_CLOUD_RUN", TOKEN_DURATION)

    if err != nil {
        println(err.Error())
    }
    //fmt.Printf("Token: %s\n", token)

    accessToken, err := getGoogleID(token)
    if err != nil {
        println(err.Error())
    }
    fmt.Printf("From GoogleToken: %s\n", accessToken)
}

// CallAPIEndpoint Makes a call to a specified endpoint taking parameters method, url token and some payload
func callAPIEndpoint(method string, urls string, token string, payload io.Reader) ([]byte, error) {

    granttype := "urn:ietf:params:oauth:grant-type:jwt-bearer"

    res, err := http.PostForm(urls, url.Values{"grant_type": {granttype}, "assertion": {token}})
    if err != nil {
        return []byte{}, err
    }
    defer res.Body.Close()
    body, _ := ioutil.ReadAll(res.Body)
    if res.StatusCode >= 400 {
        return []byte{}, fmt.Errorf("error generating google id token jwt")
    }
    return body, nil
}

type GoogleID struct {
    Token string `json:"id_token"`
}

I can use the token in outputs.http but the problem is that the token expires after an hour (maximum time it allows for).

# A plugin that can transmit metrics over HTTP
[[outputs.http]]
  ## URL is the address to send metrics to
    url = 'CLOUD_RUN_URL'

  ## Timeout for HTTP message
    timeout = "20s"

  ## HTTP method, one of: "POST" or "PUT"
    method = "POST"
    data_format = "wavefront"
   #convert_paths = false
  ## HTTP Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "identity"

  ## Additional HTTP headers
    [outputs.http.headers]
  #   # Should be set manually to "application/json" for json data_format
    Content-Type = "application/octet-stream"
    Accept = "application/json"
    Authorization = "Bearer TOKEN

Thoughts?

Thanks!

crflanigan commented 2 years ago

Current code can be found here: https://github.com/homedepot/telegraf