redis / go-redis

Redis Go client
https://redis.uptrace.dev
BSD 2-Clause "Simplified" License
19.61k stars 2.31k forks source link

XReadGroup sometimes returns an i/o timeout error when no message is read #2963

Open danielnelson opened 3 months ago

danielnelson commented 3 months ago

When using Redis streams with multiple consumers, I get an i/o timeout on consumers that don't read a message. This seems to occur when another consumer did read a message but due to low traffic this consumer reached the BLOCK timeout. If none of the consumers at all read a message I get a redis.Nil error.

Expected Behavior

When no item is read from the stream the consumer should always get a redis.Nil error.

Current Behavior

In the script below I receive i/o timeout errors:

read tcp 127.0.0.1:47842->127.0.0.1:6379: i/o timeout 40.01311409s

In addition to the error type not being redis.Nil, it takes about 10s extra for the command to complete. I also imagine that the connection is no longer reused.

Possible Solution

Steps to Reproduce

In this script I have 5 stream consumers each doing a 30s blocking read. There is one goroutine adding a single item with a random 0-60s delay:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "strings"
    "sync"
    "time"

    "github.com/redis/go-redis/v9"
)

func main() {
    rdb := redis.NewClient(&redis.Options{})

    ctx := context.Background()
    stream := "timeout-stream"
    group := "timeout-group"

    _, err := rdb.XGroupCreateMkStream(ctx, stream, group, "0").Result()
    if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
        fmt.Println(err)
        os.Exit(1)
    }

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            v := rand.Intn(60)
            time.Sleep(time.Duration(v) * time.Second)

            fmt.Println("XADD")
            _, err = rdb.XAdd(ctx, &redis.XAddArgs{
                Stream: stream,
                ID:     "*",
                Values: []string{"mykey", "myvalue"},
            }).Result()

            if err != nil {
                fmt.Println(err)
                os.Exit(1)
            }
        }
    }()

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            consumer := fmt.Sprintf("consumer-%d", i)
            for {
                start := time.Now()
                item, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
                    Group:    group,
                    Consumer: consumer,
                    Streams:  []string{stream, ">"},
                    Count:    1,
                    Block:    30 * time.Second,
                }).Result()
                if err != nil {
                    fmt.Printf("%s %s\n", err, time.Since(start))
                    continue
                }

                rdb.XAck(ctx, stream, group, item[0].Messages[0].ID)
                fmt.Println("XACK")
            }
        }()
    }

    wg.Wait()
}

Example Output:

XADD
XACK
read tcp 127.0.0.1:46528->127.0.0.1:6379: i/o timeout 40.018844088s
read tcp 127.0.0.1:46540->127.0.0.1:6379: i/o timeout 40.019074019s
read tcp 127.0.0.1:46530->127.0.0.1:6379: i/o timeout 40.019142248s
read tcp 127.0.0.1:46536->127.0.0.1:6379: i/o timeout 40.01921703s
redis: nil 30.095551148s
redis: nil 30.005739055s
redis: nil 30.005510956s
redis: nil 30.005496188s
redis: nil 30.005690323s
XADD
XACK
XADD
XACK
XADD
XACK
XADD
XACK
XADD
XACK
read tcp 127.0.0.1:46516->127.0.0.1:6379: i/o timeout 40.001276509s
read tcp 127.0.0.1:54476->127.0.0.1:6379: i/o timeout 40.001512971s
XADD
XACK                                                
read tcp 127.0.0.1:35944->127.0.0.1:6379: i/o timeout 40.014963904s
XADD                                                
XACK                                                
XADD                                                
XACK                                                
XADD                                                
XACK                                                
read tcp 127.0.0.1:54470->127.0.0.1:6379: i/o timeout 40.001857285s
XADD                                                
XACK                                                
XADD                                                
XACK                                                
read tcp 127.0.0.1:58234->127.0.0.1:6379: i/o timeout 40.008846243s
read tcp 127.0.0.1:45494->127.0.0.1:6379: i/o timeout 40.009956562s
read tcp 127.0.0.1:49356->127.0.0.1:6379: i/o timeout 40.001642322s
redis: nil 30.06908248s                             
redis: nil 32.071284132s

Context (Environment)

Detailed Description

Possible Implementation