redis / rueidis

A fast Golang Redis client that supports Client Side Caching, Auto Pipelining, Generics OM, RedisJSON, RedisBloom, RediSearch, etc.
Apache License 2.0
2.47k stars 158 forks source link

Getting EOFs while running the watch command #675

Open yash-nisar opened 6 days ago

yash-nisar commented 6 days ago

Hey @rueian, we're using Azure Redis Cache that supports clustering (Azure Redis Premium) with rueidis. We noticed that we are seeing intermittent EOFs while running this piece of code:

dedicatedConn, cancel := c.client.Dedicate()
defer cancel()
watchResult := dedicatedConn.Do(ctx, dedicatedConn.B().Watch().Key(key).Build())
if watchResult.Error() != nil {
  c.logger.Error(watchResult.Error(), "error in watch command")
  return watchResult.Error()
}

This is the first part of the watch-multi-exec operation.

We initialise our client by this code so we're mostly leveraging defaults:

func createRedisClient(ctx context.Context, addr, clientID string, tlsConfig *tls.Config, logger logging.Logger, token *azcore.AccessToken, mu *sync.Mutex) (rueidis.Client, error) {
    client, err := rueidis.NewClient(rueidis.ClientOption{
        InitAddress:       []string{addr},
        TLSConfig:         tlsConfig,
        AuthCredentialsFn: createAuthCredentialsFn(ctx, clientID, logger, token, mu),
    })
    if err != nil {
        logger.Error(err, "Failed to create Redis client")
        return nil, err
    }
    return client, nil
}

We're trying to understand why we're seeing EOFs and would like to mitigate them. Here's my understanding:

I'm happy to provide more details if required.

rueian commented 6 days ago

Hi @yash-nisar,

It seems like a bug. I may need a way to reproduce this first. It would be helpful if you can provide your setup including azure redis and a small, simple but complete program that can demonstrate the issue.

rueian commented 6 days ago

I just tried the following program and it worked fine on my side:

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "github.com/redis/rueidis"
)

func main() {
    client, err := rueidis.NewClient(rueidis.ClientOption{
        InitAddress: []string{"**********.redis.cache.windows.net:6380"},
        TLSConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        Password: "****************",
    })
    if err != nil {
        panic(err)
    }

    var do = func(key string) {
        conn, cancel := client.Dedicate()
        defer cancel()
        if err := conn.Do(context.Background(), conn.B().Watch().Key(key).Build()).Error(); err != nil {
            panic(err)
        }
        if err := conn.Do(context.Background(), conn.B().Get().Key(key).Build()).Error(); err != nil && !rueidis.IsRedisNil(err) {
            panic(err)
        }
        for _, resp := range conn.DoMulti(
            context.Background(),
            conn.B().Multi().Build(),
            conn.B().Set().Key(key).Value(key).Build(),
            conn.B().Exec().Build(),
        ) {
            if err := resp.Error(); err != nil {
                panic(err)
            }
        }
    }
    for {
        do("******")
        fmt.Println("ok")
    }
}

Did EOF happen occasionally or constantly?

rueian commented 4 days ago

Hi @yash-nisar,

Do you have more information about your setup?

skkgh commented 4 days ago

Hi @rueian, Appreciate your time.

Here is more detailed logic of our setup (still trimmed version). In your sample setup above, the code doesn't have a case of idle connection right?, its a back to back for loop. I'm just guessing if its a case of idle connection timeout, and we are missing sending "PING" command to redis and resulting in EOF. Our errors are intermittent not consistent.

Let me know if you need more details from us.

type RedisClient struct {
    client                rueidis.Client
    logger                logging.Logger
}

func (c *RedisClient) Set(ctx context.Context, key string, val any, options RedisSetOptions) error {
    valueToSet, err := options.SerializeValue(val)
    if err != nil {
        return err
    }

    dedicatedConn, cancel := c.client.Dedicate()
    defer cancel()
    watchResult := dedicatedConn.Do(ctx, dedicatedConn.B().Watch().Key(key).Build())
    if watchResult.Error() != nil {
        // This is where EOF errors are happening intermittently
        c.logger.Error(watchResult.Error(), "error in watch command")
        return watchResult.Error()
    }
    defer func() {
        unwatchResult := dedicatedConn.Do(ctx, dedicatedConn.B().Unwatch().Build())
        if unwatchResult.Error() != nil {
            c.logger.Error(unwatchResult.Error(), "error in unwatch command")
        }
    }()
    if options.ShouldUpdate != nil {
        result := dedicatedConn.Do(ctx, dedicatedConn.B().Get().Key(key).Build())
        shouldUpdate, err := options.ShouldUpdate(result, val)
        if err != nil {
            return err
        }
        if !shouldUpdate {
            c.logger.Info("skipping cache update", "CacheKey", key)
            return nil
        }
    }

    cmd := dedicatedConn.B().Set().Key(key).Value(valueToSet)
    if options.Expiry > 0 {
        cmd.Ex(options.Expiry)
    }

    results := dedicatedConn.DoMulti(ctx,
        dedicatedConn.B().Multi().Build(),
        cmd.Build(),
        dedicatedConn.B().Exec().Build())

    // ... code that handles errors/results

    return nil
}

func createRedisClient(ctx context.Context, addr, clientID string, tlsConfig *tls.Config, logger logging.Logger, token *azcore.AccessToken, mu *sync.Mutex) (rueidis.Client, error) {
    client, err := rueidis.NewClient(rueidis.ClientOption{
        InitAddress:       []string{addr},
        TLSConfig:         tlsConfig,
        AuthCredentialsFn: createAuthCredentialsFn(ctx, clientID, logger, token, mu),
    })
    if err != nil {
        logger.Error(err, "Failed to create Redis client")
        return nil, err
    }
    return client, nil
}

// Builds new redis client from the env settings
func NewRedisClient(ctx context.Context, logger logging.Logger, enableKeyspaceEventListener bool) (*RedisClient, error) {

    var redisClient *RedisClient
    var token azcore.AccessToken
    var mu sync.Mutex

    addr := redisHost + ":" + redisPort

    // Create Redis client
    client, err := createRedisClient(ctx, addr, clientID, tlsConfig, logger, &token, &mu)
    if err != nil {
        logger.Error(err, "Failed to create redis client")
        return nil, err
    }

    // Start token refresh loop
    go startTokenRefreshLoop(ctx, client, clientID, &token, &mu, logger, errChan)

    redisClient = &RedisClient{
        client: client,
        logger: logger,
    }

    return redisClient, nil
}
rueian commented 4 days ago

Hi @yash-nisar,

I'm just guessing if its a case of idle connection timeout

To verify it, could you add AlwaysPipelining: true to your rueidis.ClientOption? We start background goroutines to ping the server periodically when pipelining. If AlwaysPipelining eliminates the EOF, then your server indeed drops idle connections.

yash-nisar commented 19 hours ago

Hey @rueian, we can confirm that Azure Redis does have a 10 min timeout for idle connections.

Source: https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout

We will test out AlwaysPipelining: true and let you know how it goes. We will also add retries to make our system more resilient.

yash-nisar commented 2 hours ago

Hey @rueian , can you help clarify a couple of things ?

We start background goroutines to ping the server periodically when pipelining

Does this only happen when AlwaysPipelining: true ? I'm asking because we ran a sample program with and without AlwaysPipelining: true and saw the client making regular pings to Azure Redis. Here's the code that we ran: Is that expected ?

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "github.com/redis/rueidis"
    "time"
)

func main() {
    client, err := rueidis.NewClient(rueidis.ClientOption{
        InitAddress: []string{"xxxx.redis.cache.windows.net:6380"},
        TLSConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
        Password:         "xxxx",
                 AlwaysPipelining: true, // This was removed for testing
    })
    if err != nil {
        panic(err)
    }
    defer client.Close()

    // First operation to establish connection
    do(client, "test-key")
    fmt.Println("Initial operation completed")

    // Simulate idle period for 11 minutes
    fmt.Println("Starting 11 minute idle period...")
    time.Sleep(11 * time.Minute)

    // Try operation after idle period
    fmt.Println("Idle period complete, attempting operation...")
    do(client, "test-key")
    fmt.Println("Operation after idle period succeeded!")
}

func do(client rueidis.Client, key string) {
    conn, cancel := client.Dedicate()
    defer cancel()

    if err := conn.Do(context.Background(), conn.B().Watch().Key(key).Build()).Error(); err != nil {
        panic(fmt.Sprintf("Watch failed: %v", err))
    }

    if err := conn.Do(context.Background(), conn.B().Get().Key(key).Build()).Error(); err != nil && !rueidis.IsRedisNil(err) {
        panic(fmt.Sprintf("Get failed: %v", err))
    }

    for _, resp := range conn.DoMulti(
        context.Background(),
        conn.B().Multi().Build(),
        conn.B().Set().Key(key).Value(key).Build(),
        conn.B().Exec().Build(),
    ) {
        if err := resp.Error(); err != nil {
            panic(fmt.Sprintf("Multi operation failed: %v", err))
        }
    }
}
rueian commented 1 hour ago

Hi @yash-nisar,

Oh, yes, it is actually expected. Sorry that I got it wrong previously. We actually start background goroutines to ping the server periodically, no matter if the connection is in pipelining mode or not.

rueian commented 34 minutes ago

So things get mysterious now. I think we have three things to do:

  1. we could try to disable the background goroutines first and see if the error is the same as EOF when Azure drops the connection.
  2. AlwaysPipelining: true could still be useful in this scenario. When a connection is in the pipelining mode, it constantly reads messages from the underlying net.Conn. In other words, it can notice EOF in advance. We could modify our connection pool to check if the connection is broken before giving it to the user.
  3. I wonder if the EOF could be related to TLS. Probably just try plain TCP to see if there will still be EOF?