datastax / go-cassandra-native-protocol

Cassandra Native Protocol bindings for the Go language
Apache License 2.0
13 stars 10 forks source link

Deadlock when opening and closing connections concurrently #38

Open joao-r-reis opened 3 years ago

joao-r-reis commented 3 years ago

I ran into two different issues, one is a deadlock and the other is leaked goroutines. I'm not exactly sure if these goroutines will be cleaned at some point but I did see them in the debugger.

Deadlock

While trying to reproduce #36 with an automated test, I ran into a deadlock caused by a blocking channel write here that causes the goroutine to not release the connectionsLock.

In the debugger I see a lot of goroutines blocked while trying to acquire that lock which is not possible because of that blocked channel write in the first goroutine.

I believe that this is because that channel is in fact nil due to this call to close() and writing to a nil channel blocks forever.

Leaked goroutines

I also see several background goroutines blocked here and here.

This happens because neither channel is buffered so the write will only progress when a read happens but the for loop returns before reading from both when an error happens.


Here is the test that I used to reproduce these issues:

func TestDeadlock(t *testing.T) {
    server := client.NewCqlServer(
        "127.0.0.1:9043",
        &client.AuthCredentials{
            Username: "cassandra",
            Password: "cassandra",
        },
    )
    defer server.Close()

    clt := client.NewCqlClient(
        "127.0.0.1:9043",
        &client.AuthCredentials{
            Username: "cassandra",
            Password: "cassandra",
        },
    )

    numberOfGoroutines := 10
    errChan := make(chan error, numberOfGoroutines)
    wg := &sync.WaitGroup{}
    defer wg.Wait()
    ctx, cancelFn := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancelFn()

    err := server.Start(ctx)
    require.Nil(t, err)

    for i := 0; i < numberOfGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for ctx.Err() == nil {
                newCtx, newCancelFn := context.WithTimeout(ctx, time.Duration(rand.Intn(50)) * time.Millisecond)
                clientConn, serverConn, err := server.BindAndInit(
                    clt, newCtx, primitive.ProtocolVersion4, client.ManagedStreamId)
                if err == nil {
                    err = clientConn.InitiateHandshake(primitive.ProtocolVersion4, client.ManagedStreamId)
                }
                newCancelFn()
                if clientConn != nil {
                    clientConn.Close()
                }
                if serverConn != nil {
                    serverConn.Close()
                }
                if err != nil && newCtx.Err() == nil {
                    errChan <- err
                    return
                }
            }
        }()
    }

    select {
    case err = <-errChan:
        t.Errorf("%v", err)
    case <-ctx.Done():
    }
}
adutra commented 3 years ago
  1. For the deadlock, I think we simply need to test h.isClosed() after acquiring h.connectionsLock in onConnectionAccepted.
  2. Leaked go routines: the code used to test if an error occurred is indeed clumsy. I suggest that we replace it with https://pkg.go.dev/golang.org/x/sync/errgroup. We used it already in another project and it works great, even if it does not have a stable version yet.
joao-r-reis commented 3 years ago

For 1, won't that introduce a race condition?

adutra commented 3 years ago

I think we are fine? Because the channel is closed while holding the connectionsLock, see here.