StackExchange / StackExchange.Redis

General purpose redis client
https://stackexchange.github.io/StackExchange.Redis/
Other
5.89k stars 1.51k forks source link

When xreadgroup block reads data after a redistimeout, the first message cannot be consumed #2475

Open punisher1 opened 1 year ago

punisher1 commented 1 year ago
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

class Program
{

    static void Main()
    {

        var config = ConfigurationOptions.Parse("127.0.0.1:6379");
        config.CommandMap = CommandMap.Create(new HashSet<string> { "SUBSCRIBE" }, false);
        var redisRead = ConnectionMultiplexer.Connect(config);
        var redisReadDb = redisRead.GetDatabase(1);

        try
        {
            redisReadDb.StreamCreateConsumerGroup("test", "default", StreamPosition.NewMessages, true);
        }
        catch { }

        var redisWrite = ConnectionMultiplexer.Connect(config);
        var redisWriteDb = redisWrite.GetDatabase(1);

        var writeTask = Task.Run(async () =>
        {
            while (true)
            {
                var id = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());
                var id2 = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());

                Console.WriteLine($"{DateTime.Now:O} write message:{id},{id2}");
                Thread.Sleep(10000);
            }
        });

        var readTask = Task.Run(async () =>
        {
            var arguments = new List<object>
                {
                    "GROUP",
                    "default",
                    $"dotnet",
                    "COUNT",
                    10,
                    "BLOCK",
                    "0",
                    "STREAMS",
                    "test",
                    ">"
                };

            while (true)
            {
                try
                {
                    Console.WriteLine($"{DateTime.Now:O} redis xreadgroup block read ...");

                    var result = await redisReadDb.ExecuteAsync("xreadgroup", arguments).ConfigureAwait(false);
                    if (result.IsNull)
                    {
                        Console.WriteLine($"{DateTime.Now:O} redis xreadgroup isnull");
                    }
                    else
                    {
                        foreach (RedisResult[] subresults in (RedisResult[])result)
                        {
                            var streamName = (RedisValue)subresults[0];

                            foreach (RedisResult[] messages in (RedisResult[])subresults[1])
                            {
                                var id = (RedisValue)messages[0];
                                Console.WriteLine($"{DateTime.Now:O} read message:{id}");
                                await redisReadDb.StreamAcknowledgeAsync("test", "default", id);
                            }
                        }
                    }
                }
                catch (RedisTimeoutException)
                {
                    Console.WriteLine($"{DateTime.Now:O} redis timeout ...");
                }
            }
        });

        Console.ReadLine();
    }
}

2023-05-31T14:25:27.4733370+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.4976884+08:00 read message:1685514327495-0 2023-05-31T14:25:27.4994336+08:00 write message:1685514327495-0,1685514327498-0 2023-05-31T14:25:27.5082905+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.5101550+08:00 read message:1685514327498-0 2023-05-31T14:25:27.5107652+08:00 redis xreadgroup block read ... 2023-05-31T14:25:33.4848775+08:00 redis timeout ... 2023-05-31T14:25:33.4851831+08:00 redis xreadgroup block read ... 2023-05-31T14:25:37.5174767+08:00 write message:1685514337514-0,1685514337515-0 2023-05-31T14:25:37.5175269+08:00 read message:1685514337515-0 2023-05-31T14:25:37.5189520+08:00 redis xreadgroup block read ... 2023-05-31T14:25:43.4835083+08:00 redis timeout ... 2023-05-31T14:25:43.4837146+08:00 redis xreadgroup block read ... 2023-05-31T14:25:47.5262835+08:00 write message:1685514347525-0,1685514347526-0 2023-05-31T14:25:47.5262841+08:00 read message:1685514347526-0 2023-05-31T14:25:47.5276668+08:00 redis xreadgroup block read ... 2023-05-31T14:25:53.4743876+08:00 redis timeout ... 2023-05-31T14:25:53.4745574+08:00 redis xreadgroup block read ... 2023-05-31T14:25:57.5292349+08:00 write message:1685514357528-0,1685514357529-0 2023-05-31T14:25:57.5292381+08:00 read message:1685514357529-0 2023-05-31T14:25:57.5314306+08:00 redis xreadgroup block read ...

haoma2514 commented 10 months ago

This may be because the "xreadgroup" command has been executed on the server before receiving the "timeout" result? It should never time out when reading is blocked.

haoma2514 commented 10 months ago

Maybe you can refer to it https://github.com/StackExchange/StackExchange.Redis/issues/2135