splunk / pipelines

Concurrent processing pipelines in Go.
MIT License
21 stars 8 forks source link

ErrorSink does not seem to be support multi stage pipeline #40

Closed rvdwijngaard closed 1 year ago

rvdwijngaard commented 1 year ago

This package looks promising. The concept of an ErrorSink is exactly what I was looking for. In my scenario I have a pipeline with multiple stages. Two stages need to make network calls. Some errors are fatal and the pipeline has to be canceled if one of these errors occur. When I use ErrorSink.Fatal() in multiple stages I get a panic:

panic: send on closed channel

goroutine 7 [running]:
github.com/splunk/pipelines.(*ErrorSink).Fatal(...)
        /home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:567
main.main.func1({0x4adc11, 0x1})
        /home/ron/go/src/bitbucket.org/innius/shared-script-pipeline/main.go:18 +0xdb
github.com/splunk/pipelines.doMap[...]({0x4cf300?, 0xc00002e080}, 0xc000070180, 0xc000014290, 0xc00002a180)
        /home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:78 +0x124
github.com/splunk/pipelines.Map[...].func1({0xc000012030?, 0x0?, 0x0?})
        /home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:62 +0x4c
github.com/splunk/pipelines.doWithConf[...].func1()
        /home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:444 +0x98
created by github.com/splunk/pipelines.doWithConf[...]
        /home/ron/go/pkg/mod/github.com/splunk/pipelines@v1.0.2/pipelines.go:436 +0x2ea
exit status 2

My test program looks like this:

package main

import (
    "context"
    "fmt"

    "github.com/splunk/pipelines"
)

func main() {
    ctx := context.Background()
    input := pipelines.Chan([]string{"a", "b", "c", "d", "e", "f"})

    ctx, errs := pipelines.NewErrorSink(ctx)

    stage1 := pipelines.Map(ctx, input, func(item string) string {
        if item == "c" {
            errs.Fatal(fmt.Errorf("%s caused an error at stage 2", item))
        }
        return item
    })

    stage2 := pipelines.Map(ctx, stage1, func(item string) string {
        if item == "b" {
            errs.Fatal(fmt.Errorf("%s caused an error at stage 2", item))
        }
        return item
    })
       pipelines.Drain(ctx, stage2)
}

Did I miss something or is this something which should work?

rvdwijngaard commented 1 year ago

Creating different ErrorSinks for each stage seems to work but it does not seem like a solid solution to me

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/splunk/pipelines"
)

func main() {
    ctx := context.Background()
    input := pipelines.Chan([]string{"a", "b", "c", "d", "e", "f"})

    ctx1, errs1 := pipelines.NewErrorSink(ctx)
    ctx2, errs2 := pipelines.NewErrorSink(ctx)

    stage1 := pipelines.Map(ctx1, input, func(item string) string {
        if item == "c" {
            errs1.Fatal(fmt.Errorf("%s caused an error at stage 1", item))
        }
        return item
    })

    stage2 := pipelines.Map(ctx2, stage1, func(item string) string {
        if item == "b" {
            errs2.Fatal(fmt.Errorf("%s caused an error at stage 2", item))
        }
        return item
    })

    _, err := pipelines.Drain(ctx2, stage2)
    if err != nil {
        log.Println(err.Error())
    }
    for _, err := range errs1.All() {
        log.Println(err.Error())
    }
    for _, err := range errs2.All() {
        log.Println(err.Error())
    }
}
rvdwijngaard commented 1 year ago

I have created #41 which fixes this error. Alternatively we could decide to provide ErrorSink with a Close() func which closes all open channels of the sink.

kalexmills commented 1 year ago

Thanks for the error report! I'm taking a look into the Close() func idea today. Sorry about the delay.

kalexmills commented 1 year ago

See #43 for another approach which doesn't make any use of recover. This adds a Close() func like you suggested, and clarifies that the contract for ErrorSink only supports eventual consistency.

rvdwijngaard commented 1 year ago

@kalexmills thanks for the fix. Your solution is better because it makes the contract clear.