Closed iPazooki closed 1 year ago
Tagging subscribers to this area: @eerhardt, @maryamariyan, @michaelgsharp See info in area-owners.md if you want to be subscribed.
Author: | iPazooki |
---|---|
Assignees: | - |
Labels: | `area-Extensions-Caching`, `untriaged` |
Milestone: | - |
If the creation method completes synchronously, it will only run once.
The following code will only print First call
:
var memoryCache = new ServiceCollection()
.AddMemoryCache()
.BuildServiceProvider()
.GetRequiredService<IMemoryCache>();
memoryCache.GetOrCreateAsync("Key", _ =>
{
Console.WriteLine("First call");
return Task.FromResult("");
});
memoryCache.GetOrCreateAsync("Key", _ =>
{
Console.WriteLine("Second call");
return Task.FromResult("");
});
However, another creation method will run if the first run doesn't complete:
var memoryCache = new ServiceCollection()
.AddMemoryCache()
.BuildServiceProvider()
.GetRequiredService<IMemoryCache>();
var task1 = memoryCache.GetOrCreateAsync("Key", async _ =>
{
Console.WriteLine("First call");
await Task.Delay(1000);
return "";
});
var task2 = memoryCache.GetOrCreateAsync("Key", async _ =>
{
Console.WriteLine("Second call");
await Task.Delay(1000);
return "";
});
Task.WaitAll(task1, task2);
This will print both First call
and Second call
.
The current memory cache behavior is close to LazyThreadSafetyMode.PublicationOnly
. You may want equivalent to LazyThreadSafetyMode.ExecutionAndPublication
.
This is how I've solved the problem in the past, although this was written a few years ago and I'm not sure if anything in the framework has changed since which would make and of this code redundant. It uses the following imports and depends on the Nito.AsyncEx library:
using Microsoft.Extensions.Caching.Memory;
using Nito.AsyncEx;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Essentially it's an asynchronous version of the double-checked locking pattern, but uses two layers of this. The first layer (GetOrCreateKeyedLock
) uses a shared AsyncLock
to control access locks which are specific to each key, the second layer (GetOrCreateSafe
) uses that specific AsyncLock
to control access to the generator function. The two layers reduce congestion when many threads are trying to access either the same resource or different resources. The vast majority of the work only occurs when the cache does not have the result of the item you're looking for.
There's also an extra method (GetOrCreateSafeBatch
) which is useful when you have multiple items which you need, but want to reduce the number of calls to whatever service generates them. This is useful where you are populating the cache from a remote service or database and it's advantageous to batch these requests.
public static class IMemoryCacheExtensions
{
private static readonly TimeSpan DefaultMaximumKeyedLockDuration = TimeSpan.FromMinutes(15);
private static readonly TimeSpan DefaultBatchWait = TimeSpan.FromSeconds(1);
private static readonly AsyncLock KeyedAsyncLock = new AsyncLock();
private static readonly MemoryCacheEntryOptions KeyedAsyncLockCacheEntryOptions = new MemoryCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromDays(1)
};
public static async Task<TItem> GetOrCreateSafe<TItem>(this IMemoryCache memoryCache, string key, Func<ICacheEntry, Task<TItem>> factory, TimeSpan? maximumKeyedLockDuration = null, CancellationToken cancellationToken = default)
{
maximumKeyedLockDuration = maximumKeyedLockDuration ?? DefaultMaximumKeyedLockDuration;
if (memoryCache.TryGetValue(key, out TItem item))
{
return item;
}
using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
cancellationTokenSource.CancelAfter(maximumKeyedLockDuration.Value);
AsyncLock asyncLock = await memoryCache
.GetOrCreateKeyedLock(key, cancellationTokenSource.Token)
.ConfigureAwait(false);
using (
await asyncLock
.LockAsync(cancellationTokenSource.Token)
.ConfigureAwait(false)
)
{
if (memoryCache.TryGetValue(key, out item))
{
return item;
}
ICacheEntry cacheEntry = memoryCache.CreateEntry(key);
item = await factory(cacheEntry).ConfigureAwait(false);
cacheEntry.SetValue(item);
// Cannot use using statement as we don't want the cache entry to be added if the factory throws an exception
cacheEntry.Dispose();
return item;
}
}
}
public static async Task<TItem[]> GetOrCreateSafeBatch<TItem>(this IMemoryCache memoryCache, string[] keys, Func<string[], Task<TItem[]>> factory, Func<string, string> keyTransformer = null, Action<ICacheEntry> configureCacheEntry = null, TimeSpan? batchWait = null, int maximumNumberOfBatches = 3, CancellationToken cancellationToken = default)
{
batchWait = batchWait ?? DefaultBatchWait;
ConcurrentDictionary<string, TaskCompletionSource<TItem>> pendingKeys = new ConcurrentDictionary<string, TaskCompletionSource<TItem>>();
Dictionary<string, Task<TItem>> tasks = keys
.Distinct()
.ToDictionary(
k => k,
k => memoryCache.GetOrCreateSafe(
keyTransformer != null ? keyTransformer(k) : k,
e =>
{
configureCacheEntry?.Invoke(e);
TaskCompletionSource<TItem> completionSource = pendingKeys.GetOrAdd(k, new TaskCompletionSource<TItem>());
return completionSource.Task;
},
cancellationToken: cancellationToken
)
);
Func<Task> callFactoryForPendingKeys = new Func<Task>(async () =>
{
if (pendingKeys.Any())
{
string[] originalKeys = pendingKeys.Keys.ToArray();
TItem[] results = await factory(originalKeys);
foreach (KeyValuePair<string, TItem> kvp in originalKeys.Zip(results, (key, result) => new KeyValuePair<string, TItem>(key, result)))
{
if (pendingKeys.TryRemove(kvp.Key, out TaskCompletionSource<TItem> completionSource))
{
completionSource.TrySetResult(kvp.Value);
}
}
}
});
int numberOfBatches = 0;
while (!tasks.All(t => t.Value.IsCompleted || pendingKeys.ContainsKey(t.Key)))
{
Task timeoutTask = Task.Delay(batchWait.Value);
Task completedTask = await Task.WhenAny(tasks.Values.Concat(new[] { timeoutTask }));
if (completedTask == timeoutTask)
{
// Possible deadlock, attempt to process the items we have
if (!pendingKeys.Any())
{
if (numberOfBatches > maximumNumberOfBatches)
{
throw new InvalidOperationException("Deadlock occurred during GetOrCreateSafeBatch call.");
}
continue;
}
await callFactoryForPendingKeys();
numberOfBatches++;
}
}
await callFactoryForPendingKeys();
if (keys.Any(k => !tasks[k].IsCompleted))
{
throw new InvalidOperationException("GetOrCreateSafeBatch factory did not return the correct number of results.");
}
return await Task.WhenAll(keys.Select(k => tasks[k]));
}
public static async Task<AsyncLock> GetOrCreateKeyedLock(this IMemoryCache memoryCache, string key, CancellationToken cancellationToken = default)
{
key = key + "¦lock";
if (memoryCache.TryGetValue(key, out AsyncLock asyncLock))
{
return asyncLock;
}
using (
await KeyedAsyncLock
.LockAsync(cancellationToken)
.ConfigureAwait(false)
)
{
if (memoryCache.TryGetValue(key, out asyncLock))
{
return asyncLock;
}
asyncLock = new AsyncLock();
memoryCache.Set(key, asyncLock, KeyedAsyncLockCacheEntryOptions);
return asyncLock;
}
}
}
It seems there is a bug in the GetOrCreateAsync implementation.
I very much doubt that we had such a bug, but even if we did, we certainly don't have it anymore as GetOrCreateAsync
uses the described TryGetValue
to see whether given key is present in the cache. It calls the async factory only when it's not:
If my understanding is wrong, please correct me.
Description
When you use
GetOrCreateAsync
to get/create an object, it always calls the inner method and it seems Get is always returning null as nothing will be saved in the cache. This code will always call_repository.Get()
:But it works fine if I use it like this:
It seems there is a bug in the
GetOrCreateAsync
implementation.Configuration
Version of .NET: .Net Core 3.1