signalfx / signalfx-go

Go client library and instrumentation bindings for SignalFx
https://www.signalfx.com
Apache License 2.0
14 stars 48 forks source link

comp.Done() not called? #112

Closed stevenvachon closed 3 years ago

stevenvachon commented 3 years ago

After logging a few data and "no data available", it just continues on forever logging "for 1", "default" and "for 2":

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/signalfx/signalfx-go/signalflow"
)

func main() {
    client, err := signalflow.NewClient(
        signalflow.StreamURLForRealm("us1"),
        signalflow.AccessToken("<EDITED>"), // expires in 3 days
    )
    if err != nil {
        return
    }

    comp, err := client.Execute(&signalflow.ExecuteRequest{
        Program: "data(\"cpu.utilization\").mean().publish()",
        Start:   time.Unix(1606243500, 0),
        Stop:    time.Unix(1606247100, 0),
    })
    if err != nil {
        return
    }

    // breaks were targeting the switch
    done := false

    for !done {
        log.Println("SFX :: for 1")
        select {
        case msg, ok := <-comp.Data():
            if !ok {
                // this is never reached
                log.Println("SFX :: closing")
                client.Close()
                done = true
            } else if len(msg.Payloads) == 0 {
                log.Println("SFX :: no data available")
            } else {
                for _, pl := range msg.Payloads {
                    meta := comp.TSIDMetadata(pl.TSID)
                    log.Println(fmt.Sprintf("SFX :: %s %v: %v", meta.OriginatingMetric, meta.CustomProperties, pl.Value()))
                }
            }
        case <-comp.Done():
            // THIS IS NEVER REACHED
            log.Println("SFX :: closing")
            client.Close()
            done = true
        case msg := <-comp.Channel().Messages():
            // this is never reached
            backend.Logger.Debug(fmt.Sprintf("SFX :: message :: %s :: %s", msg.Type(), msg.Base().String()))
        default:
            log.Println(fmt.Sprintf("SFX :: default :: %v", comp.IsFinished())) // never true
        }
        log.Println("SFX :: for 2")
    }

    // this is never reached
    if err := comp.Err(); err != nil {
        log.Println(fmt.Sprintf("SFX :: error: %v", err))
        return
    }

    // this is never reached
    log.Println("SFX :: job completed")
    return
}
keitwb commented 3 years ago

Hi @stevenvachon, this seems to be an oversight in that the END_OF_CHANNEL message was not properly handled. #113 should fix it.

But also you need to remove this case from your select statement:

case msg := <-comp.Channel().Messages():

That is preventing the computation from getting the messages (channel messages can only be gotten by one receiver) and so the metadata won't be cached properly.

Also I would recommend using the pattern

for msg := range comp.Data() {
...
}

as once the fix is merged and you upgrade, the for loop with automatically exit when the computation is over.

stevenvachon commented 3 years ago

I originally didn't have that case, but added it because the computation was not completing. The behaviour hasn't changed since adding it. I'll try using that PR. Thanks!

stevenvachon commented 3 years ago

Your changes solve my problem! Thanks again so much!