uber-go / cadence-client

Framework for authoring workflows and activities running on top of the Cadence orchestration engine.
https://cadenceworkflow.io
MIT License
345 stars 130 forks source link

Goroutine leak after Worker.Stop() #1129

Open asido opened 2 years ago

asido commented 2 years ago

Describe the bug Worker.Stop() doesn't await for goroutines to stop, which goleak identifies as a goroutine leak.

To Reproduce Is the issue reproducible?

Steps to reproduce the behavior:

go.mod:

module cadence-goleak

go 1.16

require (
    go.uber.org/cadence v0.17.0
    go.uber.org/fx v1.14.2
    go.uber.org/goleak v1.1.10
    go.uber.org/yarpc v1.57.1
)

main.go:

package main

import (
    "context"

    "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
    "go.uber.org/cadence/worker"
    "go.uber.org/cadence/workflow"
    "go.uber.org/fx"
    "go.uber.org/yarpc/api/transport"
    "go.uber.org/yarpc/yarpctest"
)

var opts = fx.Options(
    fx.Provide(newCadenceWorker),
    fx.Invoke(registerCadenceWorker),
)

func main() {
    fx.New(opts).Run()
}

func newCadenceWorker() worker.Worker {
    serviceClient := workflowserviceclient.New(&transport.OutboundConfig{
        CallerName: "caller-name",
        Outbounds: transport.Outbounds{
            ServiceName: "service-name",
            Unary:       yarpctest.NewFakeTransport().NewOutbound(yarpctest.NewFakePeerList()),
        },
    })

    w := worker.New(serviceClient, "cadence-domain", "task-list", worker.Options{})
    w.RegisterWorkflowWithOptions(noopWorkflow, workflow.RegisterOptions{Name: "noop-workflow"})
    return w
}

func registerCadenceWorker(worker worker.Worker, lifecycle fx.Lifecycle) {
    lifecycle.Append(fx.Hook{
        OnStart: func(context.Context) error {
            return worker.Start()
        },
        OnStop: func(context.Context) error {
            worker.Stop()
            return nil
        },
    })
}

func noopWorkflow(ctx workflow.Context) error {
    return nil
}

main_test.go:

package main

import (
    "testing"
    "time"

    "go.uber.org/fx/fxtest"
    "go.uber.org/goleak"
)

func TestMain(m *testing.M) {
    goleak.VerifyTestMain(m)
}

func Test_app_runs(t *testing.T) {
    app := fxtest.New(t, opts)
    app.RequireStart()
    // Give time for worker polling to begin.
    time.Sleep(time.Second)
    app.RequireStop()
}
$ go test .
goleak: Errors on successful test run: found unexpected goroutines:
[Goroutine 24 in state sleep, with time.Sleep on top of the stack:
goroutine 24 [sleep]:
time.Sleep(0x2035a97a4)
    /usr/local/Cellar/go/1.16.6/libexec/src/runtime/time.go:193 +0xd2
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).throttleInternal(0xc000160aa0, 0xc00007cd20)
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/backoff/retry.go:62 +0x7e
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).Throttle(...)
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/backoff/retry.go:48
go.uber.org/cadence/internal.(*baseWorker).pollTask(0xc0003581c0)
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:261 +0x4a
go.uber.org/cadence/internal.(*baseWorker).runPoller(0xc0003581c0)
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:227 +0xb5
created by go.uber.org/cadence/internal.(*baseWorker).Start
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:190 +0xbb

 Goroutine 13 in state semacquire, with sync.runtime_Semacquire on top of the stack:
goroutine 13 [semacquire]:
sync.runtime_Semacquire(0xc000358238)
    /usr/local/Cellar/go/1.16.6/libexec/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc000358230)
    /usr/local/Cellar/go/1.16.6/libexec/src/sync/waitgroup.go:130 +0x65
go.uber.org/cadence/internal/common/util.AwaitWaitGroup.func1(0xc000358230, 0xc0000b2360)
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/util/util.go:52 +0x2b
created by go.uber.org/cadence/internal/common/util.AwaitWaitGroup
    /Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/util/util.go:51 +0x6e
]
FAIL    cadence-goleak  10.567s

Expected behavior

$ go test .
ok      cadence-goleak

Additional Context Used cadence-client v0.17.0, because with the program above v0.18.2 download fails. But the reported problem is in both releases.

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
    have (thrift.TStruct)
    want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
    have ()
    want (context.Context)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
    have ()
    want (context.Context)
Groxx commented 2 years ago

Yeah, AFAICT the shutdown process just closes a channel and returns :| so there's no way to wait for a clean shutdown.

There are a few other shutdown leaks throughout the client, unfortunately they're definitely not all simple fixes.


As far as this error:

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
    have (thrift.TStruct)
    want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
    have ()
    want (context.Context)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
    have ()
    want (context.Context)

^ that means you already have a version of thrift that's too new for cadence. They made a breaking change without releasing a new version, and go modules do not allow specifying upper limits, so unfortunately you need to downgrade it to something compatible. If you do a "clean" go get, it works, because that downloads the minimum versions we specify in our go.mod file.

longquanzheng commented 2 years ago

You can pin the thrift version: like this: https://github.com/uber/cadence/blob/d3d06825adcf11c20ec3fc58e329f1d9560bb729/go.mod#L92