Issue tracker is used for reporting bugs and discussing new features. Please use
stackoverflow for supporting issues.
Expected Behavior
Sharded pubsub subscriber should get messages from all channels that it's subscribed to.
Current Behavior
Sharded pubsub subscriber can only get messages from the shard that the first subscribed channel belongs.
Possible Solution
Have sharded pubsub subscriber listen to all shards to which its subscribed channels belong.
Steps to Reproduce
Start Redis Server with cluster node on.
Run the go script given below.
Observe that the ssubscriber can only receive messages from a subset of channels that it has subscribed to.
Context (Environment)
This happens across different environments as long as a Redis cluster is used.
Detailed Description
Code to reproduce this bug:
package main
import (
"context"
"flag"
"fmt"
"os"
"github.com/google/logger"
"github.com/redis/go-redis/v9"
)
const logPath = "./ssub.log"
var verbose = flag.Bool("verbose", true, "print info level logs to stdout")
func main() {
lf, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0660)
if err != nil {
logger.Fatalf("Failed to open log file: %v", err)
}
defer lf.Close()
defer logger.Init("SsubLogger", *verbose, true, lf).Close()
logger.Info("")
// Create a new RedisCluster client
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"localhost:6379",
},
})
// Ping the Redis Cluster to ensure connectivity
if err := rdb.Ping(context.Background()).Err(); err != nil {
logger.Infof("Could not connect to Redis Cluster: %v\n", err)
}
logger.Info("RedisCluster client is ready")
pubsub := rdb.SSubscribe(context.Background())
defer pubsub.Close()
// Ssubscribe to ch 1 - 9
for i := 1; i <= 9; i++ {
ch := fmt.Sprintf("ch%d", i)
pubsub.SSubscribe(context.Background(), ch)
logger.Info("Subscribed to ", ch)
rdb.SPublish(context.Background(), ch, "Hello")
}
chss, _ := rdb.PubSubShardChannels(context.Background(), "ch*").Result()
logger.Info("Currently subscribed channels: ", chss)
for {
// logger.Info("Waiting for sharded Redis pub-sub messages")
// ReceiveMessage will crash when SSubscribe is used.
// msg, err := pubsub.ReceiveMessage(context.Background())
// if err != nil {
// logger.Info(err)
// }
msg := <-pubsub.Channel()
logger.Infof("Received %s %s", msg.Channel, msg.Payload)
}
}
Output:
INFO : 2024/09/25 15:35:14.774884 ssub.go:24:
INFO : 2024/09/25 15:35:14.781693 ssub.go:37: RedisCluster client is ready
INFO : 2024/09/25 15:35:14.782690 ssub.go:47: Subscribed to ch1
INFO : 2024/09/25 15:35:14.783934 ssub.go:47: Subscribed to ch2
INFO : 2024/09/25 15:35:14.784396 ssub.go:47: Subscribed to ch3
INFO : 2024/09/25 15:35:14.784838 ssub.go:47: Subscribed to ch4
INFO : 2024/09/25 15:35:14.785478 ssub.go:47: Subscribed to ch5
INFO : 2024/09/25 15:35:14.785714 ssub.go:47: Subscribed to ch6
INFO : 2024/09/25 15:35:14.785889 ssub.go:47: Subscribed to ch7
INFO : 2024/09/25 15:35:14.786079 ssub.go:47: Subscribed to ch8
INFO : 2024/09/25 15:35:14.786246 ssub.go:47: Subscribed to ch9
INFO : 2024/09/25 15:35:14.786489 ssub.go:53: Currently subscribed channels: []
INFO : 2024/09/25 15:35:14.786744 ssub.go:63: Received ch1 Hello
INFO : 2024/09/25 15:35:14.786808 ssub.go:63: Received ch2 Hello
INFO : 2024/09/25 15:35:14.887868 ssub.go:63: Received ch5 Hello
INFO : 2024/09/25 15:35:14.888170 ssub.go:63: Received ch6 Hello
INFO : 2024/09/25 15:35:14.989075 ssub.go:63: Received ch9 Hello
Issue tracker is used for reporting bugs and discussing new features. Please use stackoverflow for supporting issues.
Expected Behavior
Sharded pubsub subscriber should get messages from all channels that it's subscribed to.
Current Behavior
Sharded pubsub subscriber can only get messages from the shard that the first subscribed channel belongs.
Possible Solution
Have sharded pubsub subscriber listen to all shards to which its subscribed channels belong.
Steps to Reproduce
Context (Environment)
This happens across different environments as long as a Redis cluster is used.
Detailed Description
Code to reproduce this bug:
Output:
Possible Implementation