redis / go-redis

Redis Go client
https://redis.uptrace.dev
BSD 2-Clause "Simplified" License
20.17k stars 2.37k forks source link

XINFO GROUPS can't report that Lag is invalid #3185

Open sancar opened 2 weeks ago

sancar commented 2 weeks ago

XINFO GROUPS returns XInfoGroup.Lag field as 0 when redis reports nil for the Lag. The redis nil error seems to be ignored on purpose here. https://github.com/redis/go-redis/blob/master/command.go#L2151

This makes differentiating that actually lag is zero or it can not be computed.

Expected Behavior

I would expect to see a differentiable value from 0 when it is invalid.

Current Behavior

It returns 0 when redis reports nil for the Lag.

Possible Solution

We can return -1 instead of 0 when redis reports nil for the Lag.

Steps to Reproduce

Here you can find a description when redis reports nil for the lag.

One or more entries between the group's last-delivered-id and the stream's last-generated-id were deleted

https://redis.io/docs/latest/commands/xinfo-groups/

A code example to reproduce:

        r, err := redis.ParseURL("redis://:@127.0.0.1:6379")
    panicOnError(err)
    client := redis.NewClient(r)

    ctx := context.Background()
    client.FlushDB(ctx)

    client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}})
    client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
    client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})

    err = client.XGroupCreate(ctx, "s", "g", "0").Err()
    panicOnError(err)
    err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
    panicOnError(err)

    client.XDel(ctx, "s", "0-2")

    result, err := client.XInfoGroups(context.Background(), "s").Result()
    panicOnError(err)
    for _, group := range result {
        fmt.Printf("%+v\n", group)
    }

The output it prints: {Name:g Consumers:1 Pending:1 LastDeliveredID:0-1 EntriesRead:1 Lag:0}

What I expected to see: {Name:g Consumers:1 Pending:1 LastDeliveredID:0-1 EntriesRead:1 Lag:-1}

Possible Implementation

The code here can be changed to

group.Lag, err = rd.ReadInt()

// lag: the number of entries in the stream that are still waiting to be delivered
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
if err != nil {
   err == Nil { 
       group.Lag = -1       
   } else {
       return err
   }   
}           
sancar commented 2 weeks ago

If anyone needs a workaround, I have implemented the following function to use until this issue is fixed:

// CustomXInfoGroup is a custom implementation of XINFO GROUPS that can report -1 for lag when the lag can not be
// computed on the Redis side(redis returns nil for this case).
// go-redis reports 0 when redis returns nil which makes it impossible to report lag correctly.
// This code can be removed once the related issue is fixed in go-redis https://github.com/redis/go-redis/issues/3185
func CustomXInfoGroup(ctx context.Context, client *redis.Client, stream string) ([]redis.XInfoGroup, error) {
    res, err := client.Do(ctx, "XINFO", "GROUPS", stream).Result()
    if err != nil {
        return nil, err
    }
    groups, valid := res.([]any)
    if !valid {
        return nil, errors.New("unexpected type from XINFO GROUPS")
    }
    infoGroups := make([]redis.XInfoGroup, len(groups))
    for i, group := range groups {
        infoGroup := &infoGroups[i]

        switch typeCastedGroup := group.(type) {
        case []any:
            nn := len(typeCastedGroup) / 2
            var key string
            for j := 0; j < nn; j++ {
                key, valid = typeCastedGroup[j*2].(string)
                if !valid {
                    return nil, errors.New("redis: unexpected type from XINFO GROUPS for GROUP key field")
                }
                value := typeCastedGroup[(j*2)+1]
                err = fillInfoGroup(infoGroup, key, value)
                if err != nil {
                    return nil, err
                }
            }
        case map[any]any:
            for key, value := range typeCastedGroup {
                err = fillInfoGroup(infoGroup, key, value)
                if err != nil {
                    return nil, err
                }
            }
        default:
            return nil, errors.New("unexpected type from XINFO GROUPS for GROUP")
        }
    }

    return infoGroups, nil
}

func fillInfoGroup(infoGroup *redis.XInfoGroup, key any, value any) error {
    var valid bool
    switch key {
    case "name":
        infoGroup.Name, valid = value.(string)
        if !valid {
            return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Name field")
        }
    case "consumers":
        infoGroup.Consumers, valid = value.(int64)
        if !valid {
            return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Consumers field")
        }
    case "pending":
        infoGroup.Pending, valid = value.(int64)
        if !valid {
            return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Pending field")
        }
    case "last-delivered-id":
        infoGroup.LastDeliveredID, valid = value.(string)
        if !valid {
            return errors.New("redis: unexpected type from XINFO GROUPS for GROUP LastDeliveredID field")
        }
    case "entries-read":
        infoGroup.EntriesRead, valid = value.(int64)
        if !valid {
            return errors.New("redis: unexpected type from XINFO GROUPS for GROUP EntriesRead field")
        }
    case "lag":
        // lag: the number of entries in the stream that are still waiting to be delivered
        // to the group's consumers, or a NULL(Nil) when that number can't be determined.
        if value == nil {
            infoGroup.Lag = -1
        } else {
            infoGroup.Lag, valid = value.(int64)
            if !valid {
                return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Lag field")
            }
        }
    default:
        return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
    }
    return nil
}