Closed wazzamatazz closed 10 months ago
Can you create a stand alone repro so @TedHartMS can look into it?
I've faced the same issue, some entries appears to get duplicated while iterating them. This is the code that fails:
public IEnumerable<KeyValuePair<TKey, TValue>> EnumerateKeyValues(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (_disposed)
{
throw new ObjectDisposedException(nameof(MixedStorageKVStore<TKey, TValue>));
}
// Session
var session = GetPooledSession();
try
{
foreach (var kvp in GetKeyValues(session, cancellationToken))
{
yield return kvp;
}
}
finally
{
_sessionPool.Enqueue(session);
}
}
private IEnumerable<KeyValuePair<TKey, TValue>> GetKeyValues(ClientSession<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, Empty, SpanByteFunctions<Empty>> session, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (_disposed)
{
throw new ObjectDisposedException(nameof(MixedStorageKVStore<TKey, TValue>));
}
using var iterator = session.Iterate();
while (iterator.GetNext(out _))
{
cancellationToken.ThrowIfCancellationRequested();
var kvp = TryReadKeyValueInternal(iterator, out var succeeded);
if (succeeded)
{
yield return kvp;
}
}
}
private unsafe KeyValuePair<TKey, TValue> TryReadKeyValueInternal(IFasterScanIterator<SpanByte, SpanByte> iterator, out bool succeeded)
{
try
{
ref var keySpanByte = ref iterator.GetKey();
var keySpan = keySpanByte.AsReadOnlySpan();
TKey key;
fixed (byte* pointer = &keySpan.GetPinnableReference())
{
using var memoryManager = new UnmanagedMemoryManager<byte>(pointer, keySpan.Length);
key = MessagePackSerializer.Deserialize<TKey>(memoryManager.Memory, _messagePackSerializerOptions);
}
ref var valueSpanByte = ref iterator.GetValue();
var valueSpan = valueSpanByte.AsReadOnlySpan();
TValue value;
fixed (byte* pointer = &valueSpan.GetPinnableReference())
{
using var memoryManager = new UnmanagedMemoryManager<byte>(pointer, valueSpan.Length);
value = MessagePackSerializer.Deserialize<TValue>(memoryManager.Memory, _messagePackSerializerOptions);
}
succeeded = true;
return new KeyValuePair<TKey, TValue>(key, value);
}
catch (Exception)
{
// This is done because there is a bug in the iteration of the log, some kvp get iterated several times and their records are invalid
// see: https://github.com/microsoft/FASTER/issues/885
succeeded = false;
return default;
}
}
This seems to happen especially after having made a full snapshot + recovery.
Can you create a stand alone repro so @TedHartMS can look into it?
I'll see if I can put something together that reproduces it. I've only seen this behaviour in an internal .NET Framework-based project that I can't share publicly.
This seems to happen especially after having made a full snapshot + recovery.
@badrishc This ties in with what I'm seeing as well.
I've been able to replicate this fairly consistently using the same key/value store implementation that I built around FASTER as the internal project that I can't share.
The spurious keys are saved as part of a full checkpoint but they aren't visible when iterating over keys until I restart the store and recover the checkpoint. I can replicate the behaviour using both .NET 8.0 and .NET Framework app so the issue doesn't appear to be framework-specific.
Example project: FASTER-GitHub-885.zip
The source code for the key/value store wrapper I'm using is available here.
Instructions:
warn: DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore[101]
Error while recovering the FASTER log from the latest checkpoint. This message can be ignored the first time the FASTER store is used as there will be no checkpoint available to recover from.
FASTER.core.FasterException: Unable to find valid HybridLog token
at FASTER.core.FasterKV`2.FindRecoveryInfo(Int64 requestedVersion, HybridLogCheckpointInfo& recoveredHlcInfo, IndexCheckpointInfo& recoveredICInfo)
at DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore..ctor(FasterKeyValueStoreOptions options, ILogger`1 logger)
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
Content root path: C:\Repos\Other\FASTER-GitHub-885\FASTER-GitHub-885\bin\Debug\net48
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: FASTER_GitHub_885.KVStoreRunner[0]
Key: "FASTER:GitHub:885:TestKey"
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
Content root path: C:\Repos\Other\FASTER-GitHub-885\FASTER-GitHub-885\bin\Debug\net48
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: FASTER_GitHub_885.KVStoreRunner[0]
Key: "FASTER:GitHub:885:TestKey"
info: FASTER_GitHub_885.KVStoreRunner[0]
Key: "FASTER:G\uFFFD\u0002\u0000\u0000\u0001\u0000\u0000\u0000\uFFFD\u0002\u0000\u0000^\u0000\u0001\u0000\u0004"
My suspicion is that the wrapper is using key or value without ref somewhere. SpanByte is a special heap struct that can never be used without "ref" because then only the first 8 or 16 bytes will get passed on (the predefined part of the struct).
@badrishc Thanks for the reply - I'll take a look at the wrapper. Is it actually possible to use a ref struct
without the ref
? I would have expected the compiler to complain.
EDIT: When it comes to writing the values I'm just using SpanByte.FromFixedSpan
to create SpanByte instances from raw key and value bytes before performing the upsert. Should I be doing something else instead?
EDIT 2: When I'm performing reads or iterating over records I'm calling AsReadOnlySpan().ToArray()
on the SpanByte instances to convert them into byte arrays. Is there an alternative approach that I should be considering?
@badrishrc Thanks for the suggestion - I think you were correct about the wrapper. If I write the key and value bytes to a Memory<byte>
and then pin the memory before creating the key and value SpanByte
instances I do not see the spurious entries anymore.
@stealthin FYI this is what I had to do in my wrapper when writing values in case it helps you:
private async ValueTask WriteCoreAsync(byte[] key, byte[] value) {
ThrowIfDisposed();
ThrowIfReadOnly();
var session = GetPooledSession();
using (var memoryOwner = MemoryPool<byte>.Shared.Rent()) {
if (memoryOwner == null) {
throw new InvalidOperationException(Resources.Error_UnableToRentSharedMemory);
}
var memory = memoryOwner.Memory;
try {
for (var i = 0; i < key.Length; i++) {
memory.Span[i] = key[i];
}
for (var i = 0; i < value.Length; i++) {
memory.Span[key.Length + i] = value[i];
}
using (memory.Pin()) {
var keySpanByte = SpanByte.FromFixedSpan(memory.Slice(0, key.Length).Span);
var valueSpanByte = SpanByte.FromFixedSpan(memory.Slice(key.Length, value.Length).Span);
var result = await session.UpsertAsync(ref keySpanByte, ref valueSpanByte).ConfigureAwait(false);
while (result.Status.IsPending) {
result = await result.CompleteAsync().ConfigureAwait(false);
}
}
// Mark the cache as dirty.
Interlocked.Exchange(ref _fullCheckpointIsRequired, 1);
}
finally {
ReturnPooledSession(session);
}
}
}
A simpler way to code this, and avoid the memory copy is:
fixed (byte* keyPtr = key) {
fixed (byte* valuePtr = value) {
var keySB = SpanByte.FromPointer(keyPtr, key.Length);
var valueSB = SpanByte.FromFixedSpan(valuePtr, value.Length);
}
}
I see you are using the async version, so fixed may not work. You can instead use:
GCHandle h = GCHandle.Alloc(key, GCHandleType.Pinned);
IntPtr keyPtr = h.AddrOfPinnedObject();
...
h.Free()
Thanks @badrishc - I initially tried to use fixed
but ran into problems because I was using async. I'll try out your alternative approach!
I have a key/value store using
FasterKV<SpanByte, SpanByte>
. I've observed that, when upserting existing entries, I'm sometimes left with additional and unexpected records that are visible when I perform key iteration but always have theirFound
flag to set false if I try and get or delete the key, meaning that I can't remove them.What I have observed about these records is:
IScanIteratorFunctions<SpanByte, SpanByte>.SingleReader
is always called for the "bad" entries, whereasConcurrentReader
is always called for the "good" entries).RecordInfo.Dirty
flag for the record is alwaystrue
for a bad record andfalse
for a good record.NUL
characters (e.g. if a good key isRULEREPOSITORY:RULES
, a mangled key might beRULEREPO\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000
).Any suggestions about what is going on here and how I can remove these rogue entries? Thanks!