StackExchange / StackExchange.Redis

General purpose redis client
https://stackexchange.github.io/StackExchange.Redis/
Other
5.91k stars 1.51k forks source link

Load test with subscribe/unsubscribe/publish #2720

Open szaszoliver opened 6 months ago

szaszoliver commented 6 months ago

I have to following problem: During an implementation of "lock" function, I am using subscribe/unsubscribe/publish method, not to pull continuously the state of the lock. What I experience, that I register a handler with SubscribeAsync, multiply times (4) times, so I get an event, when the lock is unlocked. All parallel locks trying to get the lock, should get this event due to the registration. Using redis monitor, I can see that the "SUBSCRIBE" command is received. But at the time of publish, usually, only 2 handler is only getting the event, and not 4 of them. (sometimes I got all 4 of them, sometimes 3 of them) What I also tried, was to issue the SubscribeAsync double (exactly after each other), in this case the interesting is that I got 6 events, 2 doubles, and 2 singles. This means for me that all 8 subscribe should have been called, as not a pair (double calling) dropped out, but a part of it. I assume, that the StackExchange.Redis collects all my subscribe handlers, and if event is coming from the redis, then not redis is responsible for calling each handler, but redis only calls the StackExchange.Redis handler, which calls my all handlers. Is there a way to check if my handlers has been registered? And if not why? How can I stabilize it? I am using windows 11, with Visual studio 2022, .Net8, using a local windows redis server SE.Redis-v2.7.33.41805 I am welcoming any suggestions

mgravell commented 6 months ago

I'm a little unclear on the exact scenario here - the wording is a little hard to follow precisely, and the details matter. However:

I assume, that the StackExchange.Redis collects all my subscribe handlers, and if event is coming from the redis, then not redis is responsible for calling each handler, but redis only calls the StackExchange.Redis handler, which calls my all handlers.

At the redis level, a channel is either subscribed or not - there isn't a concept of multiple subscription to the same channel, so yes: SE.Redis implements this locally using delegate combination operations. As such, individual subscription operations don't necessarily involve the server.

Likewise, "lock" is typically used to avoid conflicting operations from competing connections, but subscriptions are connection-specific, do don't really need synchronization. If the synchronization is to deal with timing around publish... well, maybe, but that sounds like it would add disproportionate amounts of overhead to individual publish.

Maybe we should step back a level here. Rather than discussing what you're currently doing, perhaps you could clarify what you're trying to achieve in terms of logic/outcomes, so we can advise on appropriate solutions.

szaszoliver commented 6 months ago

Maybe we should step back a level here. Rather than discussing what you're currently doing, perhaps you could clarify what you're trying to achieve in terms of logic/outcomes, so we can advise on appropriate solutions.

I programming for a long time, but new in C#. So I wanted to do a library for managing task running on multiply threads or computers. This I wanted to solve with implementing a "redislock" class, where I can give a key name to the class. The class should have (beside other functions) a lockAsync and an unlockAsync function. Calling lockAsync should wait till no other task is running with the same key name, and then "reserve" the key name, and give back control to the following command after lockAsync. (Of course there needs to be timeout and other staff, but for simplicity skip it now)

As the lockAsync checks at start if the redis key exists, and if not reserves this key, and gives back control. Else it wait till key is freed (unlock or key expires). Of course here I could continuously check if the key still exists, but this loads the system, so I wanted an event management system. The first problem is that no event is generated by redis if the key is deleted or the TTL time is up. (In some newer version I think this will be in as experimental). So I use publish at unlock, and also at lock if there is a key already I make a stopper with the TTL time set, and "simulate" the unlock, if that is the case. On the event (if I can get the lock, and not someone else) I wanted the unsubscribe the event (as we are done).

In a single tests it was working well. But when I started a load test, making 5 processes at the same time all requiring the same lock, and trying to hold it for 1/2 sec, then I did not get all the events, only a few of them, in most of the cases. (Sometimes the test passed correctly, and all events was received and working.) I checked the redis monitor, and the subscribed was registered there (in this case all 4 - the first lock was successful immediately no event was needed), but the form the 4 methods, which was registered by the lock commands, only usually 2 gets the event instead of 4.

I made a "workaround" by using subscribe (and unsubscribed) in all places double (issuing the command two times, without any other commands between. And in this case my tests passes, but I can see that instead of 8 (2*4) event capture, usually I get only 6 at the first round, which means 2 subscribe was not registered well. And my problem is still I do not know what can cause this.

I hope I could clear the things a little bit more.

For testing I use the following code snippet:

public async Task LoadTestSame()
{
    const int taskCount = 5;
    int Counter = 0;
    Logger_manager.DebugLogger.Header("Load test with same lock", $"We are using lock {taskCount} times, for about 0.1 second");
    _ = RedisLockFactory.GetRedisLock(RedisKeys.UnitTesting.TestLock1); //Prevent delay at lazy load later, for better time measuring
    var tasks = new List<Task>();
    for (int i = 0; i < taskCount; i++)
    {
        var localI = i;
        var task = Task.Run(async () =>
        {
            Logger_manager.DebugLogger.WriteToDebug($"Init redis #{localI}...");
            var lock1 = RedisLockFactory.GetRedisLock(RedisKeys.UnitTesting.TestLock1);
            Logger_manager.DebugLogger.WriteToDebug($"Locking #{localI}...");
            await lock1.Lock(default, TimeSpan.FromSeconds(20));
            Logger_manager.DebugLogger.WriteToDebug($"Locked #{localI}...");
            await Task.Delay(TimeSpan.FromSeconds(0.1));
            Interlocked.Increment(ref Counter);
            Logger_manager.DebugLogger.WriteToDebug($"Unlocking #{localI}...");
            await lock1.Unlock();
            Logger_manager.DebugLogger.WriteToDebug($"Unlocked #{localI}...");
        });
        tasks.Add(task);
    }
    var StopWatch = Stopwatch.StartNew();
    var timeoutTask = Task.Delay(TimeSpan.FromSeconds(taskCount * 2));
    var completedTask = await Task.WhenAny(Task.WhenAll(tasks), timeoutTask);
    StopWatch.Stop();
    Logger_manager.DebugLogger.WriteToDebug($"Evaluating running... [{StopWatch.ElapsedMilliseconds}]");
    Assert.AreNotEqual(timeoutTask, completedTask, "Task did not finish in time...");
    Assert.AreEqual(taskCount, Counter);
    Logger_manager.DebugLogger.WriteToDebug($"Let check if runtime is a little bit larger then {taskCount * 100} [ms]...");
    Assert.IsTrue(StopWatch.ElapsedMilliseconds > 100 * taskCount, $"Elapsed time {StopWatch.ElapsedMilliseconds}"); //All task minimum run for 100 ms
    Assert.IsTrue(StopWatch.ElapsedMilliseconds < 300 * taskCount, $"Elapsed time {StopWatch.ElapsedMilliseconds}"); //Due to context switching and other stuff, it can be a little bit larger time (assumed 300 ms/task)
}

for the lock function i use the following code:

public async Task<bool> Lock(TimeSpan? tryToLockTimeout = default, TimeSpan? lockTimeout = default)
{
    return await Task.Run(async () =>
    {
        if (await LockNoWait(lockTimeout))
        {
            return true;
        }
        else
        { //***ToDo: detect dual lock
            var testId = Guid.NewGuid().ToString().Substring(1, 5);
            SimulateUnlockEventOnTTL();
            TaskCompletionSource<bool> taskCompletionSource = new();
            Action<RedisChannel, RedisValue>? messageHandler = null;
            messageHandler = async (p1, p2) =>
            {
                Console.WriteLine("^^^^called-lock:" + testId + ":" + _keyName + ":" + p1 + ":" + p2);
                if (await LockNoWait(lockTimeout))
                {
                    Console.WriteLine("^^^^request cancel-lock1:" + testId + ":" + _keyName);
                    cancellationTokenSourceForTTLCheck?.Cancel();
                    Console.WriteLine("^^^^unsubscribe-lock1:" + testId + ":" + _keyName);
                    await _subscriber.UnsubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler);
                    await _subscriber.UnsubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler); //todo remove double
                    taskCompletionSource.TrySetResult(true);
                }
            };
            Console.WriteLine("^^^^subscribe-lock:" + testId + ":" + _keyName);
            try
            {
                Console.WriteLine("^^^^subscribe-state:" + testId + ":" + _subscriber.IsConnected() + ":" + _subscriber.IsConnected(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal)));
                await _subscriber.SubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler);
                await _subscriber.SubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler); //todo remove doubel insert
            }
            catch (Exception ex)
            {
                Console.WriteLine("^^^^error-subscribe:" + ex.Message);
            }

            Console.WriteLine("^^^^publish-unlock ***Just for testing, how many module gets the event");
            await _subscriber.PublishAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), "test");

            Task timeoutTask = Task.Delay(tryToLockTimeout ?? TimeSpan.FromHours(1));
            await Task.WhenAny(taskCompletionSource.Task, timeoutTask);
            if (taskCompletionSource.Task.IsCompleted)
            {
                return await taskCompletionSource.Task;
            }
            else
            {
                Console.WriteLine("^^^^request cancel-lock2:" + _keyName);
                cancellationTokenSourceForTTLCheck?.Cancel();
                Console.WriteLine("^^^^unsubscribe-lock2:" + _keyName);
                await _subscriber.UnsubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler);
                await _subscriber.UnsubscribeAsync(new RedisChannel($"{RedisKeys.RedisManagement.unlockSubscribe_Key}{_keyName}", PatternMode.Literal), messageHandler); //todo remove double
                return false;
            }
        }
    });
}

In the result I get:

00:00:02.1521232 : Init redis #3...
00:00:02.1520454 : Init redis #1...
00:00:02.1521549 : Init redis #4...
00:00:02.1527659 : Locking #3...
00:00:02.1528018 : Locking #1...
00:00:02.1521483 : Init redis #2...
00:00:02.1528683 : Locking #4...
00:00:02.1520657 : Init redis #0...
00:00:02.1529428 : Locking #2...
00:00:02.1529908 : Locking #0...
00:00:02.1649843 : Locked #2...
^^^^subscribe-lock:29c49:unitTest.redis.lock1
^^^^subscribe-lock:95ebd:unitTest.redis.lock1
^^^^subscribe-lock:d12fd:unitTest.redis.lock1
^^^^subscribe-lock:b3f67:unitTest.redis.lock1
^^^^simulate new cancel token:::
^^^^simulate new cancel token:::
^^^^simulate new cancel token:::
^^^^simulate new cancel token:::
^^^^subscribe-state:29c49:True:True
^^^^subscribe-state:d12fd:True:True
^^^^subscribe-state:b3f67:True:True
^^^^subscribe-state:95ebd:True:True
^^^^publish-unlock ***todo del
^^^^publish-unlock ***todo del
^^^^publish-unlock ***todo del
^^^^publish-unlock ***todo del
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:test
00:00:02.9722354 : Unlocking #2...
^^^^publish-unlock
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
00:00:02.9826731 : Unlocked #2...
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:95ebd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^request cancel-lock1:95ebd:unitTest.redis.lock1
^^^^unsubscribe-lock1:95ebd:unitTest.redis.lock1
00:00:02.9834080 : Locked #0...
^^^^catch cancel
00:00:03.0918084 : Unlocking #0...
^^^^publish-unlock
00:00:03.0948962 : Unlocked #0...
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:29c49:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^request cancel-lock1:29c49:unitTest.redis.lock1
^^^^unsubscribe-lock1:29c49:unitTest.redis.lock1
00:00:03.0966705 : Locked #4...
^^^^catch cancel
00:00:03.1989549 : Unlocking #4...
^^^^publish-unlock
00:00:03.2011520 : Unlocked #4...
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^called-lock:b3f67:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^request cancel-lock1:b3f67:unitTest.redis.lock1
^^^^unsubscribe-lock1:b3f67:unitTest.redis.lock1
00:00:03.2018921 : Locked #1...
^^^^catch cancel
00:00:03.3070929 : Unlocking #1...
^^^^publish-unlock
00:00:03.3090483 : Unlocked #1...
^^^^called-lock:d12fd:unitTest.redis.lock1:__keyspace@0__:unitTest.redis.lock1:unlock
^^^^request cancel-lock1:d12fd:unitTest.redis.lock1
^^^^unsubscribe-lock1:d12fd:unitTest.redis.lock1
^^^^catch cancel
00:00:03.3117276 : Locked #3...
00:00:03.4146723 : Unlocking #3...
^^^^publish-unlock
00:00:03.4157271 : Unlocked #3...
00:00:03.4163557 : Evaluating running... [1264]
00:00:03.4176167 : Let check if runtime is a little bit larger then 500 [ms]...

It is visible that the b3f67 and c9c49 coded locks are only called one time with the unlock message, and the others are two times. (I made double registration for the workaround.