gomodule / redigo

Go client for Redis
Apache License 2.0
9.74k stars 1.25k forks source link

Redis streams #375

Open samwhitecoull opened 5 years ago

samwhitecoull commented 5 years ago

I am using Redigo in a project that relies on reading messages from streams. I know this is a newer feature of Redis, but is this something that Redigo plans to support natively in the future? I have cobbled together a stream reader by traversing through the nested interfaces returned from the Do() call and using type assertion (based on trial and improvement and knowledge of whats being sent) to pull useful results out. However this is extremely brittle and only works because I can guarantee whats being appended to the streams. If a code update is not required it would be great to see some documentation showing an idiomatic way of reading streams.

smartwalle commented 5 years ago
r, err := redis.Values(conn.Do("XREAD", "STREAMS", "stream1", "stream2", "0-0", "0-0"))

for kIndex :=0; kIndex < len(r); kIndex++ {
    var keyInfo = r[kIndex].([]interface{})

    var key = string(keyInfo[0].([]byte))
    var idList = keyInfo[1].([]interface{})

    for idIndex :=0; idIndex <len(idList); idIndex++ {
        var idInfo = idList[idIndex].([]interface{})

        var id = string(idInfo[0].([]byte))

        var fieldList = idInfo[1].([]interface{})
        var field = string(fieldList[0].([]byte))
        var value = string(fieldList[1].([]byte))

        fmt.Println(key, id, field, value)
    }
}
samisagit commented 5 years ago

apologies for the radio silence, life got busy! I'm happy to tackle this feature if the main man wants it implemented as part of the API - @garyburd, otherwise I'll close this and carry on using the wrapper I mentioned.

nskforward commented 4 years ago

I should have an ability to deserialize XREAD response to a struct{} Currently I don't know how I can do it

Explosivv commented 3 years ago

any one merge this? this is helpful :)

stevenh commented 3 years ago

This is a issue not a PR, are your referring to #557 ?

ostcar commented 1 year ago

I hope a solution for this problem will be found soon.

But the solution has to consider, that a stream entry is not a map. It can contain the same field multiple times. For example:

127.0.0.1:6379> xadd test * field value1 field value2
"1667730787485-0"
127.0.0.1:6379> xread streams test 0
1) 1) "test"
   2) 1) 1) "1667730787485-0"
         2) 1) "field"
            2) "value1"
            3) "field"
            4) "value2"

The PR #557 parses the fields in a map. So it would return

map[string]string{"field":"value"}

I think it would be nice, if the the user would be able to parse the fields as he wants.

For example:

func parseStream(reply any, f func(k, v []byte)) (string, error) {
    valueList, err := redis.Values(reply, nil)
    if err != nil {
        return "", err
    }

    var lastID string
    for i, value := range valueList {
        idFields, ok := value.([]any)
        if !ok || len(idFields) != 2 {
            return "", fmt.Errorf("invalid stream value %d, got %v", i, value)
        }

        id, err := redis.String(idFields[0], nil)
        if err != nil {
            return "", fmt.Errorf("parsing id from entry %d: %w", i, err)
        }

        lastID = id

        fieldList, ok := idFields[1].([]any)
        if !ok || len(fieldList)%2 != 0 {
            return "", fmt.Errorf("invalid field list value %d, got %v", i, idFields[i])
        }

        for fi := 0; fi < len(fieldList); fi += 2 {
            key, ok := toByte(fieldList[fi])
            if !ok {
                return "", fmt.Errorf("field %d in entry %d is not a bulk string value, got %T", fi, i, fieldList[fi])
            }

            value, ok := toByte(fieldList[fi+1])
            if !ok {
                return "", fmt.Errorf("value %d in entry %d is not a bulk string value, got %T", fi+1, i, fieldList[fi])
            }

            f(key, value)
        }
    }
    return lastID, nil
}

// user function1 that builds a map
data := make(map[string]string)
lastID, err :=parseStream(reply, func(k,v []byte) {data[string(k)]=string(v)})

// user function2 that only wants the values
var data []string
lastID, err :=parseStream(reply, func(_,v []byte) {data=append(data,string(v)})