ydb-platform / ydb-go-sdk

Pure Go native and database/sql driver for YDB
https://ydb.tech
Apache License 2.0
135 stars 71 forks source link

bug: goroutine leak after turning on a session pool #1455

Open pelageech opened 5 days ago

pelageech commented 5 days ago

Bug Report

YDB GO SDK version: 3.79.2

Environment Ubuntu 20.04.6 LTS, amd64

Current behavior: When it was updated a version of ydb-go-sdk from 3.77.0 (pool was enabled via env) to 3.79.2, we noticed that goroutines started leaking.

goroutine 78032 [select, 2287 minutes]:
bb.yandex-team.ru/cloud/cloud-go/vendor/google.golang.org/grpc.newClientStreamWithParams.func4()
        bb.yandex-team.ru/cloud/cloud-go/vendor/google.golang.org/grpc/stream.go:392 +0x8c
created by bb.yandex-team.ru/cloud/cloud-go/vendor/google.golang.org/grpc.newClientStreamWithParams in goroutine 78011
        bb.yandex-team.ru/cloud/cloud-go/vendor/google.golang.org/grpc/stream.go:391 +0xe08

I checked that there weren't any code changes in my repository. I suppose the problem is in SDK. Ofc, can be wrong.

Expected behavior: No goroutine leak.

Steps to reproduce: Run the code on 3.77.0 and 3.79.2. Compare a number of goroutines in the end.

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "log"
    "math/rand/v2"
    "runtime"
    "sync"
    "time"

    "github.com/google/uuid"
    "github.com/ydb-platform/ydb-go-sdk/v3"
    "github.com/ydb-platform/ydb-go-sdk/v3/query"
    "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)

const (
    tableSize = 10000
    queries   = 1000
    chSize    = 50
)

func main() {
    go func() {
        for {
            time.Sleep(time.Second / 2)
            fmt.Println(runtime.NumGoroutine())
        }
    }()
    ctx := context.Background()

    err := initDB()
    if err != nil {
        log.Fatal("open", err)
    }
    defer d.Close(ctx)

    err = initBigTable(tableSize)
    if err != nil {
        log.Fatal("init big table", err)
    }

    wg := sync.WaitGroup{}
    wg.Add(queries)
    ch := make(chan struct{}, chSize)
    for i := 0; i < queries; i++ {
        ch <- struct{}{}
        go func() {
            defer func() { <-ch }()
            defer wg.Done()
            if err := q2(); err != nil {
                //log.Println("q2", err)
            }
        }()
    }
    wg.Wait()
    time.Sleep(10 * time.Second)
}

var d *ydb.Driver

func initBigTable(N int) error {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    err := d.Query().Exec(ctx, `DROP TABLE IF EXISTS test`)
    if err != nil {
        return err
    }

    err = d.Query().Exec(ctx, `CREATE TABLE test (
    id Utf8,
    value Uint64,
    PRIMARY KEY(id)
    )
    `)

    if err != nil {
        return fmt.Errorf("create table test: %w", err)
    }

    var vals []types.Value
    for i := 0; i < N; i++ {
        vals = append(vals, types.StructValue(
            types.StructFieldValue("id", types.UTF8Value(uuid.NewString())),
            types.StructFieldValue("value", types.Uint64Value(rand.Uint64())),
        ))
    }
    err = d.Query().Do(context.Background(), func(ctx context.Context, s query.Session) error {
        err = s.Exec(ctx, `
            PRAGMA AnsiInForEmptyOrNullableItemsCollections;
            DECLARE $vals AS List<Struct<
                id: Utf8,
                value: Uint64
            >>;

            INSERT INTO test SELECT id, value FROM AS_TABLE($vals);`,
            query.WithParameters(
                ydb.ParamsBuilder().
                    Param("$vals").BeginList().AddItems(vals...).EndList().Build(),
            ),
        )
        if err != nil {
            return fmt.Errorf("init table: %w", err)
        }

        return nil
    })

    return err
}

func initDB() error {
    ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
    defer cancel()
    var err error
    d, err = ydb.Open(ctx,
        "grpc://localhost:2136/local",
    )
    return err
}

func q2() error {
    ctx := context.Background()

    return d.Query().DoTx(ctx, func(ctx context.Context, s query.TxActor) error {
        var (
            id string
            v  uint64
        )

        res, err := s.Query(ctx, `SELECT id, value FROM test`)
        if err != nil {
            return err
        }

        for set, err := res.NextResultSet(ctx); !errors.Is(err, io.EOF); set, err = res.NextResultSet(ctx) {
            if err != nil {
                return err
            }

            for row, err := set.NextRow(ctx); !errors.Is(err, io.EOF); row, err = set.NextRow(ctx) {
                if err != nil {
                    return err
                }

                err := row.Scan(&id, &v)
                if err != nil {
                    return err
                }
            }
        }
        return res.Close(ctx)
    }, query.WithTxSettings(query.TxSettings(query.WithSerializableReadWrite())))
}
pelageech commented 5 days ago

I think it connected with https://github.com/ydb-platform/ydb-go-sdk/issues/1456