lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

"panic: sync: negative WaitGroup counter" in PartitionProcessor.VisitValues #433

Closed akshatraika-moment closed 2 months ago

akshatraika-moment commented 1 year ago

Hi, seeing a panic loop in https://github.com/lovoo/goka/blob/master/partition_processor.go#L685 and I am using version 1.1.7

Here is the stack trace:

/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:72 +0xa5
created by golang.org/x/sync/errgroup.(*Group).Go
/go/pkg/mod/golang.org/x/sync@v0.1.0/errgroup/errgroup.go:75 +0x64
golang.org/x/sync/errgroup.(*Group).Go.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/multierr/errgroup.go:48 +0x52
github.com/lovoo/goka/multierr.(*ErrGroup).Go.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/processor.go:934 +0x39
github.com/lovoo/goka.(*Processor).VisitAllWithStats.func1()
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/partition_processor.go:716 +0x69a
github.com/lovoo/goka.(*PartitionProcessor).VisitValues(0xc000a20500, {0x1607528, 0xc0178d4c40}, {0x1270c8a, 0x8}, {0x1124020?, 0xc00062d560}, 0xc0188d0550)
/go/pkg/mod/github.com/lovoo/goka@v1.1.7/partition_processor.go:701
github.com/lovoo/goka.(*PartitionProcessor).VisitValues.func1(...)
/usr/local/go/src/sync/waitgroup.go:108
sync.(*WaitGroup).Done(...)
/usr/local/go/src/sync/waitgroup.go:83 +0xda
sync.(*WaitGroup).Add(0xc018024e48?, 0xc018024d7c?)
goroutine 1087 [running]:
panic: sync: negative WaitGroup counter

There is possibly a race condition in the VisitValues function. Can someone please take a look?

Happy to provide more information if needed but this seems to be independent of my implementation.

mikemoment commented 1 year ago

Can someone please take a look at this issue? It caused a failure in our production env. Thanks!

asoliman20 commented 1 year ago

+1

I'm also having this issue!

frairon commented 1 year ago

Could you provide some information how you use the visit functionality? Like how is it initialized, what's done inside the visit function and so on?

akshatraika-moment commented 12 months ago

Hi @frairon , thanks for the quick response. For sure, here is how we have initialized and used it: main =>

func main_runner() error {
    if err := config.Init(); err != nil {
        if errors.Is(err, flag.ErrHelp) {
            return nil
        }

        return fmt.Errorf("loading configuration: %w", err)
    }

    ctx := context.Background()
    ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    if err := database.Init(ctx); err != nil {
        return fmt.Errorf("setting up database: %w", err)
    }

    if err := app.Init(ctx, g); err != nil {
        return fmt.Errorf("in application: %w", err)
    }

    return g.Wait()
}

app.New() =>

// New creates a new instance of the stream processing application.
func New(brokers []string, opts ...Option) (*App, error) {
    // Default options
    opt := &options{
        groupName:        "servicename",
        inputTopic:       "topicname",
        logger:           zap.NewNop(),
        processCallback:  DefaultProcessCallback,
        snapshotInterval: 10 * time.Second,
    }

    // Update defaults
    opt.apply(opts...)

    app := &App{
        logger:           opt.logger,
        snapshotInterval: opt.snapshotInterval,
    }

    // Define the group graph
    graph := goka.DefineGroup(
        opt.groupName,
        goka.Input(opt.inputTopic, orderbook.DeltaCodec{}, process),
        goka.Visitor("snapshot", app.snapshot),
        goka.Persist(orderbook.Codec{}),
    )

    // Create processor
    proc, err := goka.NewProcessor(brokers, graph, opt.processorOptions...)
    if err != nil {
        return nil, err
    }

    app.processor = proc
    return app, nil
}

app.Run() =>

func (app *App) Run(ctx context.Context, db database.Interface) error {
    if err := app.setupMetrics(); err != nil {
        return fmt.Errorf("setting up metrics: %w", err)
    }

    g, ctx := errgroup.WithContext(ctx)
    sw := database.NewSafeAsyncBatchSnapshotWriter(db, 1000)
    g.Go(func() error {
        return sw.Run(ctx)
    })

    g.Go(func() error {
        return app.processor.Run(ctx)
    })

    // Snapshotter thread.
    g.Go(func() error {
        app.logger.Info("snapshotter started")
        defer app.logger.Info("snapshotter stopped")

        for {
            select {
            case <-ctx.Done():
                return nil
            default:
            }

            // Trigger the visitor callback, which writes a snapshot to the db
            visited, err := app.processor.VisitAllWithStats(ctx, "snapshot", sw)
            if err != nil {
                app.logger.Error("error while snapshotting", zap.Error(err))
            }

            // Flush the async snapshot writer between rounds of snapshotting.
            if err := sw.Flush(context.TODO()); err != nil {
                app.logger.Error("error flushing snapshot", zap.Error(err))
            }

            app.logger.Info("visit complete", zap.Int64("num_visited", visited))

            // Sleep for a few seconds before taking another round of snapshots.
            sleep, cancel := context.WithTimeout(ctx, app.snapshotInterval)
            <-sleep.Done()
            cancel()
        }
    })

    return g.Wait()
}

app.Init() =>

// Init sets up and runs the stream processing application, including the
// snapshotter thread, which will be restarted each time the underlying Kafka
// consumer group rebalances.
func Init(ctx context.Context, g *errgroup.Group) error {
    cfg := goka.DefaultConfig()
    kafka := config.GetKafkaConfig()
    if kafka.AuthMechanism == config.KafkaAuthIAM {
        cfg.Net.SASL.Enable = true
        cfg.Net.SASL.Mechanism = sarama.SASLTypeAWSMSKIAM
        cfg.Net.SASL.AWSMSKIAM = sarama.AWSMSKIAMConfig{Region: "ignored"}
    }

    if kafka.UseTLS {
        cfg.Net.TLS.Enable = true
    }

    // Set up application
    goka.ReplaceGlobalConfig(cfg)
    meter := otel.Meter(pkgName)
    tracer = otel.Tracer(pkgName)
    messagesProcessed, _ = meter.Int64Counter("input.messages_processed")
    logger := zap.L()
    db := database.GetDatabaseInstance()
    opts := []Option{
        WithLogger(logger),
        WithProcessCallback(process),
        WithProcessorOptions(
            goka.WithConsumerGroupBuilder(consumerGroupBuilder()),
            goka.WithStorageBuilder(storage.MemoryBuilder()),
            goka.WithTopicManagerBuilder(
                topicManagerBuilder(kafka.TableReplicationFactor),
            ),
        ),
    }

    if kafka.GroupName != "" {
        opts = append(opts, WithGroupName(kafka.GroupName))
    }

    if kafka.InputTopic != "" {
        opts = append(opts, WithInputTopic(kafka.InputTopic))
    }

    app, err := New(kafka.BootstrapServers, opts...)
    if err != nil {
        logger.Error("failed to create application", zap.Error(err))
        return fmt.Errorf("creating application: %w", err)
    }

    g.Go(func() error {
        if err := app.Run(ctx, db); err != nil {
            logger.Error("error in application", zap.Error(err))
            return fmt.Errorf("running application: %w", err)
        }

        return nil
    })

    return nil
}

Inside the visit function, we are just writing a snapshot of the results to postgres. Our processor keeps an incremental record of the data we get, does some ETL into an in-memory store struct. A bit hesitant to give a lot of information on the internals of those processors and visitors since this is business IP.

But I think the issue is independent of what is happening inside that function, right? The wait group counter should be resilient to even situation except a seg fault. IMO, the error lies in the way the wg.Done() is being called in the library in the partition_processor.

Please let me know if you need more information. Appreciate the help on this!

frairon commented 12 months ago

Hey @akshatraika-moment,

thanks a lot for the detailed information! No need to share more, especially not sensitive information - don't worry :).

We found something indeed, which might have caused this behavior on two conditions:

For some background: Inside the processor, the visit function is draining the visit-channel to clean up un-visited items. This happens if either the visit-context was closed or the processor is shutting down. We think that the bug was triggered when the processor was shutting down due to a panic inside the visit-callback, which would close the channel. The draining function failed to check for the channel-state so it would call wg.Done() in an infinite loop causing it to crash.

But that also meant that somewhere in the logs of your application there should be an error logged somewhere. Or maybe it got swallowed by the panic before it could surface somewhere.

Anyway, this PR should fix the issue, hoping it does also in your case. Would you be able to test it before we release it or are you dependend on a proper release?

Just as a note on the Visit in general: this functionality is specifically meant to "tap" into the processor event loop to be able to modify the processor state while the processor is running. It's mainly used for migrating/fixing/cleaning up something that can't be done from the outside. It's kind of an open-heart surgery, that's also why the whole processor will shutdown if an error occurs in the visit-callback. If you can and your use case allows it, it's much safer to create a new View on the processor table and use view.Iterator() to do whatever needs to be done with the data.

akshatraika-moment commented 11 months ago

Thanks @frairon I looked at the fix you made. Although, I am not sure if this is going to fix the problem entirely. I found more stuff on Friday. I think there might be a race condition somewhere in the waitgroups when the CPU is overloaded. I have an hypothesis that there is a correlation between CPU killing goroutines and the wg counter losing track. Here are 2 graphs. image

image

The histogram is the graph of the panic logs and the other graph is the CPU usage in my service. As you can see, they look very similar.

Unfortunately, the only place where we see the error is in production so we will not be able to test your PR there. But once you merge it in, we can give it a try. Right now, I am trying to move over the service to using views - thanks for suggesting that.

frairon commented 11 months ago

Hmm, I don't think the CPU is killing goroutines, also the high CPU-usage is probably caused by the iteration itself, because right now there is no rate control when iterating the state. So not sure what the actual cause is. But anyway, the fix is merged and released to v1.1.10. Maybe you can check it out and see if it solved your issue.

Thanks!

frairon commented 11 months ago

@akshatraika-moment does the new version fix your issue?

akshatraika-moment commented 2 months ago

Yes, it did. Thanks!